Skip to main content

sql/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21/// The default number of partitions for OpenTelemetry traces.
22const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24/// The maximum number of partitions for OpenTelemetry traces.
25const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28    ($col: expr, $left_incl: expr, $right_excl: expr) => {
29        Expr::BinaryOp {
30            op: BinaryOperator::And,
31            left: Box::new(Expr::BinaryOp {
32                op: BinaryOperator::GtEq,
33                left: Box::new($col.clone()),
34                right: Box::new(Expr::Value(
35                    Value::SingleQuotedString($left_incl.to_string()).into(),
36                )),
37            }),
38            right: Box::new(Expr::BinaryOp {
39                op: BinaryOperator::Lt,
40                left: Box::new($col.clone()),
41                right: Box::new(Expr::Value(
42                    Value::SingleQuotedString($right_excl.to_string()).into(),
43                )),
44            }),
45        }
46    };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str, partition_num: Option<u32>) -> Result<Partitions> {
50    Ok(Partitions {
51        column_list: vec![Ident::new(ident)],
52        exprs: partition_rules_for_uuid(
53            partition_num.unwrap_or(DEFAULT_PARTITION_NUM_FOR_TRACES),
54            ident,
55        )?,
56    })
57}
58
59// partition_rules_for_uuid can creates partition rules up to 65536 partitions.
60fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
61    ensure!(
62        partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
63        InvalidPartitionNumberSnafu { partition_num }
64    );
65
66    let ident_expr = Expr::Identifier(Ident::new(ident).clone());
67
68    let (total_partitions, hex_length) = {
69        match partition_num {
70            2..=16 => (16, 1),
71            17..=256 => (256, 2),
72            257..=4096 => (4096, 3),
73            4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
74            _ => unreachable!(),
75        }
76    };
77
78    let partition_size = total_partitions / partition_num;
79    let remainder = total_partitions % partition_num;
80
81    let mut rules = Vec::new();
82    let mut current_boundary = 0;
83    for i in 0..partition_num {
84        let mut size = partition_size;
85        if i < remainder {
86            size += 1;
87        }
88        let start = current_boundary;
89        let end = current_boundary + size;
90
91        if i == 0 {
92            // Create the leftmost rule, for example: trace_id < '1'.
93            rules.push(Expr::BinaryOp {
94                left: Box::new(ident_expr.clone()),
95                op: BinaryOperator::Lt,
96                right: Box::new(Expr::Value(
97                    Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
98                )),
99            });
100        } else if i == partition_num - 1 {
101            // Create the rightmost rule, for example: trace_id >= 'f'.
102            rules.push(Expr::BinaryOp {
103                left: Box::new(ident_expr.clone()),
104                op: BinaryOperator::GtEq,
105                right: Box::new(Expr::Value(
106                    Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
107                )),
108            });
109        } else {
110            // Create the middle rules, for example: trace_id >= '1' AND trace_id < '2'.
111            rules.push(between_string!(
112                ident_expr,
113                format!("{:0hex_length$x}", start),
114                format!("{:0hex_length$x}", end)
115            ));
116        }
117
118        current_boundary = end;
119    }
120
121    Ok(rules)
122}
123
124#[cfg(test)]
125mod tests {
126    use sqlparser::ast::{Expr, ValueWithSpan};
127    use sqlparser::dialect::GenericDialect;
128    use sqlparser::parser::Parser;
129    use uuid::Uuid;
130
131    use super::*;
132
133    #[test]
134    fn test_partition_rules_for_uuid() {
135        // NOTE: We only test a subset of partitions to keep the test execution time reasonable.
136        // As the number of partitions increases, we need to increase the number of test samples to ensure uniform distribution.
137        assert!(check_distribution(2, 10_000)); // 2^1
138        assert!(check_distribution(4, 10_000)); // 2^2
139        assert!(check_distribution(8, 10_000)); // 2^3
140        assert!(check_distribution(16, 10_000)); // 2^4
141        assert!(check_distribution(32, 10_000)); // 2^5
142        assert!(check_distribution(64, 100_000)); // 2^6
143        assert!(check_distribution(128, 100_000)); // 2^7
144        assert!(check_distribution(256, 100_000)); // 2^8
145    }
146
147    #[test]
148    fn test_rules() {
149        let expr = vec![
150            "trace_id < '1'",
151            "trace_id >= '1' AND trace_id < '2'",
152            "trace_id >= '2' AND trace_id < '3'",
153            "trace_id >= '3' AND trace_id < '4'",
154            "trace_id >= '4' AND trace_id < '5'",
155            "trace_id >= '5' AND trace_id < '6'",
156            "trace_id >= '6' AND trace_id < '7'",
157            "trace_id >= '7' AND trace_id < '8'",
158            "trace_id >= '8' AND trace_id < '9'",
159            "trace_id >= '9' AND trace_id < 'a'",
160            "trace_id >= 'a' AND trace_id < 'b'",
161            "trace_id >= 'b' AND trace_id < 'c'",
162            "trace_id >= 'c' AND trace_id < 'd'",
163            "trace_id >= 'd' AND trace_id < 'e'",
164            "trace_id >= 'e' AND trace_id < 'f'",
165            "trace_id >= 'f'",
166        ];
167
168        let dialect = GenericDialect {};
169        let results = expr
170            .into_iter()
171            .map(|s| {
172                let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
173                parser.parse_expr().unwrap()
174            })
175            .collect::<Vec<Expr>>();
176
177        assert_eq!(
178            results,
179            partition_rule_for_hexstring("trace_id", None)
180                .unwrap()
181                .exprs
182        );
183
184        // custom partition number
185        let expr = vec![
186            "trace_id < '4'",
187            "trace_id >= '4' AND trace_id < '8'",
188            "trace_id >= '8' AND trace_id < 'c'",
189            "trace_id >= 'c'",
190        ];
191        let results = expr
192            .into_iter()
193            .map(|s| {
194                let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
195                parser.parse_expr().unwrap()
196            })
197            .collect::<Vec<Expr>>();
198        assert_eq!(
199            results,
200            partition_rule_for_hexstring("trace_id", Some(4))
201                .unwrap()
202                .exprs
203        );
204    }
205
206    fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
207        // Generate the partition rules.
208        let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
209        let upper_bounds = upper_bounds_from_rules(&rules);
210
211        // Collect the number of partitions for each uuid.
212        let mut counts = vec![0usize; test_partition as usize];
213        for _ in 0..test_uuid_num {
214            let uuid = Uuid::new_v4().simple().to_string();
215            let partition = allocate_partition_for_uuid(uuid.as_str(), &upper_bounds);
216            counts[partition] += 1;
217        }
218
219        // Check if the partition distribution is uniform.
220        let expected_ratio = 100.0 / test_partition as f64;
221
222        // tolerance is the allowed deviation from the expected ratio.
223        let tolerance = 100.0 / test_partition as f64 * 0.30;
224
225        // For each partition, its ratio should be as close as possible to the expected ratio.
226        for count in counts {
227            let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
228            if (ratio - expected_ratio).abs() >= tolerance {
229                return false;
230            }
231        }
232
233        true
234    }
235
236    fn upper_bounds_from_rules<'a>(rules: &'a [Expr]) -> Vec<&'a str> {
237        let mut upper_bounds = Vec::with_capacity(rules.len().saturating_sub(1));
238        let mut prev_upper: Option<&'a str> = None;
239
240        for (idx, rule) in rules.iter().enumerate() {
241            let (lower, upper) = extract_rule_bounds(rule);
242            match idx {
243                0 => {
244                    assert!(lower.is_none());
245                    assert!(upper.is_some());
246                }
247                idx if idx == rules.len() - 1 => {
248                    assert_eq!(lower, prev_upper);
249                    assert!(upper.is_none());
250                }
251                _ => {
252                    assert_eq!(lower, prev_upper);
253                    assert!(upper.is_some());
254                }
255            }
256
257            if idx < rules.len() - 1 {
258                upper_bounds.push(upper.unwrap());
259            }
260
261            prev_upper = upper;
262        }
263
264        upper_bounds
265    }
266
267    fn extract_rule_bounds(rule: &Expr) -> (Option<&str>, Option<&str>) {
268        fn extract_single_quoted_string(expr: &Expr) -> Option<&str> {
269            match expr {
270                Expr::Value(ValueWithSpan {
271                    value: Value::SingleQuotedString(value),
272                    ..
273                }) => Some(value.as_str()),
274                _ => None,
275            }
276        }
277
278        match rule {
279            Expr::BinaryOp {
280                op: BinaryOperator::Lt,
281                right,
282                ..
283            } => (None, extract_single_quoted_string(right)),
284            Expr::BinaryOp {
285                op: BinaryOperator::GtEq,
286                right,
287                ..
288            } => (extract_single_quoted_string(right), None),
289            Expr::BinaryOp {
290                op: BinaryOperator::And,
291                left,
292                right,
293            } => {
294                let lower = match left.as_ref() {
295                    Expr::BinaryOp {
296                        op: BinaryOperator::GtEq,
297                        right,
298                        ..
299                    } => extract_single_quoted_string(right),
300                    _ => None,
301                };
302                let upper = match right.as_ref() {
303                    Expr::BinaryOp {
304                        op: BinaryOperator::Lt,
305                        right,
306                        ..
307                    } => extract_single_quoted_string(right),
308                    _ => None,
309                };
310                (lower, upper)
311            }
312            _ => (None, None),
313        }
314    }
315
316    fn allocate_partition_for_uuid(uuid: &str, upper_bounds: &[&str]) -> usize {
317        upper_bounds.partition_point(|upper| uuid >= *upper)
318    }
319
320    #[test]
321    fn test_extract_rule_bounds() {
322        let rules = partition_rules_for_uuid(16, "trace_id").unwrap();
323        let upper_bounds = upper_bounds_from_rules(&rules);
324        assert_eq!(upper_bounds.len(), 15);
325    }
326}