1use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28 ($col: expr, $left_incl: expr, $right_excl: expr) => {
29 Expr::BinaryOp {
30 op: BinaryOperator::And,
31 left: Box::new(Expr::BinaryOp {
32 op: BinaryOperator::GtEq,
33 left: Box::new($col.clone()),
34 right: Box::new(Expr::Value(Value::SingleQuotedString(
35 $left_incl.to_string(),
36 ))),
37 }),
38 right: Box::new(Expr::BinaryOp {
39 op: BinaryOperator::Lt,
40 left: Box::new($col.clone()),
41 right: Box::new(Expr::Value(Value::SingleQuotedString(
42 $right_excl.to_string(),
43 ))),
44 }),
45 }
46 };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50 Ok(Partitions {
51 column_list: vec![Ident::new(ident)],
52 exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53 })
54}
55
56fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58 ensure!(
59 partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60 InvalidPartitionNumberSnafu { partition_num }
61 );
62
63 let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65 let (total_partitions, hex_length) = {
66 match partition_num {
67 2..=16 => (16, 1),
68 17..=256 => (256, 2),
69 257..=4096 => (4096, 3),
70 4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71 _ => unreachable!(),
72 }
73 };
74
75 let partition_size = total_partitions / partition_num;
76 let remainder = total_partitions % partition_num;
77
78 let mut rules = Vec::new();
79 let mut current_boundary = 0;
80 for i in 0..partition_num {
81 let mut size = partition_size;
82 if i < remainder {
83 size += 1;
84 }
85 let start = current_boundary;
86 let end = current_boundary + size;
87
88 if i == 0 {
89 rules.push(Expr::BinaryOp {
91 left: Box::new(ident_expr.clone()),
92 op: BinaryOperator::Lt,
93 right: Box::new(Expr::Value(Value::SingleQuotedString(format!(
94 "{:0hex_length$x}",
95 end
96 )))),
97 });
98 } else if i == partition_num - 1 {
99 rules.push(Expr::BinaryOp {
101 left: Box::new(ident_expr.clone()),
102 op: BinaryOperator::GtEq,
103 right: Box::new(Expr::Value(Value::SingleQuotedString(format!(
104 "{:0hex_length$x}",
105 start
106 )))),
107 });
108 } else {
109 rules.push(between_string!(
111 ident_expr,
112 format!("{:0hex_length$x}", start),
113 format!("{:0hex_length$x}", end)
114 ));
115 }
116
117 current_boundary = end;
118 }
119
120 Ok(rules)
121}
122
123#[cfg(test)]
124mod tests {
125 use std::collections::HashMap;
126
127 use sqlparser::ast::Expr;
128 use sqlparser::dialect::GenericDialect;
129 use sqlparser::parser::Parser;
130 use uuid::Uuid;
131
132 use super::*;
133
134 #[test]
135 fn test_partition_rules_for_uuid() {
136 assert!(check_distribution(2, 10_000)); assert!(check_distribution(4, 10_000)); assert!(check_distribution(8, 10_000)); assert!(check_distribution(16, 10_000)); assert!(check_distribution(32, 10_000)); assert!(check_distribution(64, 100_000)); assert!(check_distribution(128, 100_000)); assert!(check_distribution(256, 100_000)); }
147
148 #[test]
149 fn test_rules() {
150 let expr = vec![
151 "trace_id < '1'",
152 "trace_id >= '1' AND trace_id < '2'",
153 "trace_id >= '2' AND trace_id < '3'",
154 "trace_id >= '3' AND trace_id < '4'",
155 "trace_id >= '4' AND trace_id < '5'",
156 "trace_id >= '5' AND trace_id < '6'",
157 "trace_id >= '6' AND trace_id < '7'",
158 "trace_id >= '7' AND trace_id < '8'",
159 "trace_id >= '8' AND trace_id < '9'",
160 "trace_id >= '9' AND trace_id < 'a'",
161 "trace_id >= 'a' AND trace_id < 'b'",
162 "trace_id >= 'b' AND trace_id < 'c'",
163 "trace_id >= 'c' AND trace_id < 'd'",
164 "trace_id >= 'd' AND trace_id < 'e'",
165 "trace_id >= 'e' AND trace_id < 'f'",
166 "trace_id >= 'f'",
167 ];
168
169 let dialect = GenericDialect {};
170 let results = expr
171 .into_iter()
172 .map(|s| {
173 let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
174 parser.parse_expr().unwrap()
175 })
176 .collect::<Vec<Expr>>();
177
178 assert_eq!(
179 results,
180 partition_rule_for_hexstring("trace_id").unwrap().exprs
181 );
182 }
183
184 fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
185 let uuids = (0..test_uuid_num)
187 .map(|_| Uuid::new_v4().to_string().replace("-", "").to_lowercase())
188 .collect::<Vec<String>>();
189
190 let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
192
193 let mut stats = HashMap::new();
195 for uuid in uuids {
196 let partition = allocate_partition_for_uuid(uuid.clone(), &rules);
197 *stats.entry(partition).or_insert(0) += 1;
199 }
200
201 let expected_ratio = 100.0 / test_partition as f64;
203
204 let tolerance = 100.0 / test_partition as f64 * 0.30;
206
207 for (_, count) in stats {
209 let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
210 if (ratio - expected_ratio).abs() >= tolerance {
211 return false;
212 }
213 }
214
215 true
216 }
217
218 fn allocate_partition_for_uuid(uuid: String, rules: &[Expr]) -> usize {
219 for (i, rule) in rules.iter().enumerate() {
220 if let Expr::BinaryOp { left, op: _, right } = rule {
221 if i == 0 {
222 if let Expr::Value(Value::SingleQuotedString(leftmost)) = *right.clone() {
224 if uuid < leftmost {
225 return i;
226 }
227 }
228 } else if i == rules.len() - 1 {
229 if let Expr::Value(Value::SingleQuotedString(rightmost)) = *right.clone() {
231 if uuid >= rightmost {
232 return i;
233 }
234 }
235 } else {
236 if let Expr::BinaryOp {
238 left: _,
239 op: _,
240 right: inner_right,
241 } = *left.clone()
242 {
243 if let Expr::Value(Value::SingleQuotedString(lower)) = *inner_right.clone()
244 {
245 if let Expr::BinaryOp {
246 left: _,
247 op: _,
248 right: inner_right,
249 } = *right.clone()
250 {
251 if let Expr::Value(Value::SingleQuotedString(upper)) =
252 *inner_right.clone()
253 {
254 if uuid >= lower && uuid < upper {
255 return i;
256 }
257 }
258 }
259 }
260 }
261 }
262 }
263 }
264
265 panic!("No partition found for uuid: {}, rules: {:?}", uuid, rules);
266 }
267}