1use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28 ($col: expr, $left_incl: expr, $right_excl: expr) => {
29 Expr::BinaryOp {
30 op: BinaryOperator::And,
31 left: Box::new(Expr::BinaryOp {
32 op: BinaryOperator::GtEq,
33 left: Box::new($col.clone()),
34 right: Box::new(Expr::Value(
35 Value::SingleQuotedString($left_incl.to_string()).into(),
36 )),
37 }),
38 right: Box::new(Expr::BinaryOp {
39 op: BinaryOperator::Lt,
40 left: Box::new($col.clone()),
41 right: Box::new(Expr::Value(
42 Value::SingleQuotedString($right_excl.to_string()).into(),
43 )),
44 }),
45 }
46 };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50 Ok(Partitions {
51 column_list: vec![Ident::new(ident)],
52 exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53 })
54}
55
56fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58 ensure!(
59 partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60 InvalidPartitionNumberSnafu { partition_num }
61 );
62
63 let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65 let (total_partitions, hex_length) = {
66 match partition_num {
67 2..=16 => (16, 1),
68 17..=256 => (256, 2),
69 257..=4096 => (4096, 3),
70 4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71 _ => unreachable!(),
72 }
73 };
74
75 let partition_size = total_partitions / partition_num;
76 let remainder = total_partitions % partition_num;
77
78 let mut rules = Vec::new();
79 let mut current_boundary = 0;
80 for i in 0..partition_num {
81 let mut size = partition_size;
82 if i < remainder {
83 size += 1;
84 }
85 let start = current_boundary;
86 let end = current_boundary + size;
87
88 if i == 0 {
89 rules.push(Expr::BinaryOp {
91 left: Box::new(ident_expr.clone()),
92 op: BinaryOperator::Lt,
93 right: Box::new(Expr::Value(
94 Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
95 )),
96 });
97 } else if i == partition_num - 1 {
98 rules.push(Expr::BinaryOp {
100 left: Box::new(ident_expr.clone()),
101 op: BinaryOperator::GtEq,
102 right: Box::new(Expr::Value(
103 Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
104 )),
105 });
106 } else {
107 rules.push(between_string!(
109 ident_expr,
110 format!("{:0hex_length$x}", start),
111 format!("{:0hex_length$x}", end)
112 ));
113 }
114
115 current_boundary = end;
116 }
117
118 Ok(rules)
119}
120
121#[cfg(test)]
122mod tests {
123 use std::collections::HashMap;
124
125 use sqlparser::ast::{Expr, ValueWithSpan};
126 use sqlparser::dialect::GenericDialect;
127 use sqlparser::parser::Parser;
128 use uuid::Uuid;
129
130 use super::*;
131
132 #[test]
133 fn test_partition_rules_for_uuid() {
134 assert!(check_distribution(2, 10_000)); assert!(check_distribution(4, 10_000)); assert!(check_distribution(8, 10_000)); assert!(check_distribution(16, 10_000)); assert!(check_distribution(32, 10_000)); assert!(check_distribution(64, 100_000)); assert!(check_distribution(128, 100_000)); assert!(check_distribution(256, 100_000)); }
145
146 #[test]
147 fn test_rules() {
148 let expr = vec![
149 "trace_id < '1'",
150 "trace_id >= '1' AND trace_id < '2'",
151 "trace_id >= '2' AND trace_id < '3'",
152 "trace_id >= '3' AND trace_id < '4'",
153 "trace_id >= '4' AND trace_id < '5'",
154 "trace_id >= '5' AND trace_id < '6'",
155 "trace_id >= '6' AND trace_id < '7'",
156 "trace_id >= '7' AND trace_id < '8'",
157 "trace_id >= '8' AND trace_id < '9'",
158 "trace_id >= '9' AND trace_id < 'a'",
159 "trace_id >= 'a' AND trace_id < 'b'",
160 "trace_id >= 'b' AND trace_id < 'c'",
161 "trace_id >= 'c' AND trace_id < 'd'",
162 "trace_id >= 'd' AND trace_id < 'e'",
163 "trace_id >= 'e' AND trace_id < 'f'",
164 "trace_id >= 'f'",
165 ];
166
167 let dialect = GenericDialect {};
168 let results = expr
169 .into_iter()
170 .map(|s| {
171 let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
172 parser.parse_expr().unwrap()
173 })
174 .collect::<Vec<Expr>>();
175
176 assert_eq!(
177 results,
178 partition_rule_for_hexstring("trace_id").unwrap().exprs
179 );
180 }
181
182 fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
183 let uuids = (0..test_uuid_num)
185 .map(|_| Uuid::new_v4().to_string().replace("-", "").to_lowercase())
186 .collect::<Vec<String>>();
187
188 let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
190
191 let mut stats = HashMap::new();
193 for uuid in uuids {
194 let partition = allocate_partition_for_uuid(uuid.clone(), &rules);
195 *stats.entry(partition).or_insert(0) += 1;
197 }
198
199 let expected_ratio = 100.0 / test_partition as f64;
201
202 let tolerance = 100.0 / test_partition as f64 * 0.30;
204
205 for (_, count) in stats {
207 let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
208 if (ratio - expected_ratio).abs() >= tolerance {
209 return false;
210 }
211 }
212
213 true
214 }
215
216 fn allocate_partition_for_uuid(uuid: String, rules: &[Expr]) -> usize {
217 for (i, rule) in rules.iter().enumerate() {
218 if let Expr::BinaryOp { left, op: _, right } = rule {
219 if i == 0 {
220 if let Expr::Value(ValueWithSpan {
222 value: Value::SingleQuotedString(leftmost),
223 ..
224 }) = *right.clone()
225 {
226 if uuid < leftmost {
227 return i;
228 }
229 }
230 } else if i == rules.len() - 1 {
231 if let Expr::Value(ValueWithSpan {
233 value: Value::SingleQuotedString(rightmost),
234 ..
235 }) = *right.clone()
236 {
237 if uuid >= rightmost {
238 return i;
239 }
240 }
241 } else {
242 if let Expr::BinaryOp {
244 left: _,
245 op: _,
246 right: inner_right,
247 } = *left.clone()
248 {
249 if let Expr::Value(ValueWithSpan {
250 value: Value::SingleQuotedString(lower),
251 ..
252 }) = *inner_right.clone()
253 {
254 if let Expr::BinaryOp {
255 left: _,
256 op: _,
257 right: inner_right,
258 } = *right.clone()
259 {
260 if let Expr::Value(ValueWithSpan {
261 value: Value::SingleQuotedString(upper),
262 ..
263 }) = *inner_right.clone()
264 {
265 if uuid >= lower && uuid < upper {
266 return i;
267 }
268 }
269 }
270 }
271 }
272 }
273 }
274 }
275
276 panic!("No partition found for uuid: {}, rules: {:?}", uuid, rules);
277 }
278}