1use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28 ($col: expr, $left_incl: expr, $right_excl: expr) => {
29 Expr::BinaryOp {
30 op: BinaryOperator::And,
31 left: Box::new(Expr::BinaryOp {
32 op: BinaryOperator::GtEq,
33 left: Box::new($col.clone()),
34 right: Box::new(Expr::Value(
35 Value::SingleQuotedString($left_incl.to_string()).into(),
36 )),
37 }),
38 right: Box::new(Expr::BinaryOp {
39 op: BinaryOperator::Lt,
40 left: Box::new($col.clone()),
41 right: Box::new(Expr::Value(
42 Value::SingleQuotedString($right_excl.to_string()).into(),
43 )),
44 }),
45 }
46 };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50 Ok(Partitions {
51 column_list: vec![Ident::new(ident)],
52 exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53 })
54}
55
56fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58 ensure!(
59 partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60 InvalidPartitionNumberSnafu { partition_num }
61 );
62
63 let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65 let (total_partitions, hex_length) = {
66 match partition_num {
67 2..=16 => (16, 1),
68 17..=256 => (256, 2),
69 257..=4096 => (4096, 3),
70 4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71 _ => unreachable!(),
72 }
73 };
74
75 let partition_size = total_partitions / partition_num;
76 let remainder = total_partitions % partition_num;
77
78 let mut rules = Vec::new();
79 let mut current_boundary = 0;
80 for i in 0..partition_num {
81 let mut size = partition_size;
82 if i < remainder {
83 size += 1;
84 }
85 let start = current_boundary;
86 let end = current_boundary + size;
87
88 if i == 0 {
89 rules.push(Expr::BinaryOp {
91 left: Box::new(ident_expr.clone()),
92 op: BinaryOperator::Lt,
93 right: Box::new(Expr::Value(
94 Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
95 )),
96 });
97 } else if i == partition_num - 1 {
98 rules.push(Expr::BinaryOp {
100 left: Box::new(ident_expr.clone()),
101 op: BinaryOperator::GtEq,
102 right: Box::new(Expr::Value(
103 Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
104 )),
105 });
106 } else {
107 rules.push(between_string!(
109 ident_expr,
110 format!("{:0hex_length$x}", start),
111 format!("{:0hex_length$x}", end)
112 ));
113 }
114
115 current_boundary = end;
116 }
117
118 Ok(rules)
119}
120
121#[cfg(test)]
122mod tests {
123 use sqlparser::ast::{Expr, ValueWithSpan};
124 use sqlparser::dialect::GenericDialect;
125 use sqlparser::parser::Parser;
126 use uuid::Uuid;
127
128 use super::*;
129
130 #[test]
131 fn test_partition_rules_for_uuid() {
132 assert!(check_distribution(2, 10_000)); assert!(check_distribution(4, 10_000)); assert!(check_distribution(8, 10_000)); assert!(check_distribution(16, 10_000)); assert!(check_distribution(32, 10_000)); assert!(check_distribution(64, 100_000)); assert!(check_distribution(128, 100_000)); assert!(check_distribution(256, 100_000)); }
143
144 #[test]
145 fn test_rules() {
146 let expr = vec![
147 "trace_id < '1'",
148 "trace_id >= '1' AND trace_id < '2'",
149 "trace_id >= '2' AND trace_id < '3'",
150 "trace_id >= '3' AND trace_id < '4'",
151 "trace_id >= '4' AND trace_id < '5'",
152 "trace_id >= '5' AND trace_id < '6'",
153 "trace_id >= '6' AND trace_id < '7'",
154 "trace_id >= '7' AND trace_id < '8'",
155 "trace_id >= '8' AND trace_id < '9'",
156 "trace_id >= '9' AND trace_id < 'a'",
157 "trace_id >= 'a' AND trace_id < 'b'",
158 "trace_id >= 'b' AND trace_id < 'c'",
159 "trace_id >= 'c' AND trace_id < 'd'",
160 "trace_id >= 'd' AND trace_id < 'e'",
161 "trace_id >= 'e' AND trace_id < 'f'",
162 "trace_id >= 'f'",
163 ];
164
165 let dialect = GenericDialect {};
166 let results = expr
167 .into_iter()
168 .map(|s| {
169 let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
170 parser.parse_expr().unwrap()
171 })
172 .collect::<Vec<Expr>>();
173
174 assert_eq!(
175 results,
176 partition_rule_for_hexstring("trace_id").unwrap().exprs
177 );
178 }
179
180 fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
181 let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
183 let upper_bounds = upper_bounds_from_rules(&rules);
184
185 let mut counts = vec![0usize; test_partition as usize];
187 for _ in 0..test_uuid_num {
188 let uuid = Uuid::new_v4().simple().to_string();
189 let partition = allocate_partition_for_uuid(uuid.as_str(), &upper_bounds);
190 counts[partition] += 1;
191 }
192
193 let expected_ratio = 100.0 / test_partition as f64;
195
196 let tolerance = 100.0 / test_partition as f64 * 0.30;
198
199 for count in counts {
201 let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
202 if (ratio - expected_ratio).abs() >= tolerance {
203 return false;
204 }
205 }
206
207 true
208 }
209
210 fn upper_bounds_from_rules<'a>(rules: &'a [Expr]) -> Vec<&'a str> {
211 let mut upper_bounds = Vec::with_capacity(rules.len().saturating_sub(1));
212 let mut prev_upper: Option<&'a str> = None;
213
214 for (idx, rule) in rules.iter().enumerate() {
215 let (lower, upper) = extract_rule_bounds(rule);
216 match idx {
217 0 => {
218 assert!(lower.is_none());
219 assert!(upper.is_some());
220 }
221 idx if idx == rules.len() - 1 => {
222 assert_eq!(lower, prev_upper);
223 assert!(upper.is_none());
224 }
225 _ => {
226 assert_eq!(lower, prev_upper);
227 assert!(upper.is_some());
228 }
229 }
230
231 if idx < rules.len() - 1 {
232 upper_bounds.push(upper.unwrap());
233 }
234
235 prev_upper = upper;
236 }
237
238 upper_bounds
239 }
240
241 fn extract_rule_bounds(rule: &Expr) -> (Option<&str>, Option<&str>) {
242 fn extract_single_quoted_string(expr: &Expr) -> Option<&str> {
243 match expr {
244 Expr::Value(ValueWithSpan {
245 value: Value::SingleQuotedString(value),
246 ..
247 }) => Some(value.as_str()),
248 _ => None,
249 }
250 }
251
252 match rule {
253 Expr::BinaryOp {
254 op: BinaryOperator::Lt,
255 right,
256 ..
257 } => (None, extract_single_quoted_string(right)),
258 Expr::BinaryOp {
259 op: BinaryOperator::GtEq,
260 right,
261 ..
262 } => (extract_single_quoted_string(right), None),
263 Expr::BinaryOp {
264 op: BinaryOperator::And,
265 left,
266 right,
267 } => {
268 let lower = match left.as_ref() {
269 Expr::BinaryOp {
270 op: BinaryOperator::GtEq,
271 right,
272 ..
273 } => extract_single_quoted_string(right),
274 _ => None,
275 };
276 let upper = match right.as_ref() {
277 Expr::BinaryOp {
278 op: BinaryOperator::Lt,
279 right,
280 ..
281 } => extract_single_quoted_string(right),
282 _ => None,
283 };
284 (lower, upper)
285 }
286 _ => (None, None),
287 }
288 }
289
290 fn allocate_partition_for_uuid(uuid: &str, upper_bounds: &[&str]) -> usize {
291 upper_bounds.partition_point(|upper| uuid >= *upper)
292 }
293
294 #[test]
295 fn test_extract_rule_bounds() {
296 let rules = partition_rules_for_uuid(16, "trace_id").unwrap();
297 let upper_bounds = upper_bounds_from_rules(&rules);
298 assert_eq!(upper_bounds.len(), 15);
299 }
300}