sql/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21/// The default number of partitions for OpenTelemetry traces.
22const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24/// The maximum number of partitions for OpenTelemetry traces.
25const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28    ($col: expr, $left_incl: expr, $right_excl: expr) => {
29        Expr::BinaryOp {
30            op: BinaryOperator::And,
31            left: Box::new(Expr::BinaryOp {
32                op: BinaryOperator::GtEq,
33                left: Box::new($col.clone()),
34                right: Box::new(Expr::Value(
35                    Value::SingleQuotedString($left_incl.to_string()).into(),
36                )),
37            }),
38            right: Box::new(Expr::BinaryOp {
39                op: BinaryOperator::Lt,
40                left: Box::new($col.clone()),
41                right: Box::new(Expr::Value(
42                    Value::SingleQuotedString($right_excl.to_string()).into(),
43                )),
44            }),
45        }
46    };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50    Ok(Partitions {
51        column_list: vec![Ident::new(ident)],
52        exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53    })
54}
55
56// partition_rules_for_uuid can creates partition rules up to 65536 partitions.
57fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58    ensure!(
59        partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60        InvalidPartitionNumberSnafu { partition_num }
61    );
62
63    let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65    let (total_partitions, hex_length) = {
66        match partition_num {
67            2..=16 => (16, 1),
68            17..=256 => (256, 2),
69            257..=4096 => (4096, 3),
70            4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71            _ => unreachable!(),
72        }
73    };
74
75    let partition_size = total_partitions / partition_num;
76    let remainder = total_partitions % partition_num;
77
78    let mut rules = Vec::new();
79    let mut current_boundary = 0;
80    for i in 0..partition_num {
81        let mut size = partition_size;
82        if i < remainder {
83            size += 1;
84        }
85        let start = current_boundary;
86        let end = current_boundary + size;
87
88        if i == 0 {
89            // Create the leftmost rule, for example: trace_id < '1'.
90            rules.push(Expr::BinaryOp {
91                left: Box::new(ident_expr.clone()),
92                op: BinaryOperator::Lt,
93                right: Box::new(Expr::Value(
94                    Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
95                )),
96            });
97        } else if i == partition_num - 1 {
98            // Create the rightmost rule, for example: trace_id >= 'f'.
99            rules.push(Expr::BinaryOp {
100                left: Box::new(ident_expr.clone()),
101                op: BinaryOperator::GtEq,
102                right: Box::new(Expr::Value(
103                    Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
104                )),
105            });
106        } else {
107            // Create the middle rules, for example: trace_id >= '1' AND trace_id < '2'.
108            rules.push(between_string!(
109                ident_expr,
110                format!("{:0hex_length$x}", start),
111                format!("{:0hex_length$x}", end)
112            ));
113        }
114
115        current_boundary = end;
116    }
117
118    Ok(rules)
119}
120
121#[cfg(test)]
122mod tests {
123    use sqlparser::ast::{Expr, ValueWithSpan};
124    use sqlparser::dialect::GenericDialect;
125    use sqlparser::parser::Parser;
126    use uuid::Uuid;
127
128    use super::*;
129
130    #[test]
131    fn test_partition_rules_for_uuid() {
132        // NOTE: We only test a subset of partitions to keep the test execution time reasonable.
133        // As the number of partitions increases, we need to increase the number of test samples to ensure uniform distribution.
134        assert!(check_distribution(2, 10_000)); // 2^1
135        assert!(check_distribution(4, 10_000)); // 2^2
136        assert!(check_distribution(8, 10_000)); // 2^3
137        assert!(check_distribution(16, 10_000)); // 2^4
138        assert!(check_distribution(32, 10_000)); // 2^5
139        assert!(check_distribution(64, 100_000)); // 2^6
140        assert!(check_distribution(128, 100_000)); // 2^7
141        assert!(check_distribution(256, 100_000)); // 2^8
142    }
143
144    #[test]
145    fn test_rules() {
146        let expr = vec![
147            "trace_id < '1'",
148            "trace_id >= '1' AND trace_id < '2'",
149            "trace_id >= '2' AND trace_id < '3'",
150            "trace_id >= '3' AND trace_id < '4'",
151            "trace_id >= '4' AND trace_id < '5'",
152            "trace_id >= '5' AND trace_id < '6'",
153            "trace_id >= '6' AND trace_id < '7'",
154            "trace_id >= '7' AND trace_id < '8'",
155            "trace_id >= '8' AND trace_id < '9'",
156            "trace_id >= '9' AND trace_id < 'a'",
157            "trace_id >= 'a' AND trace_id < 'b'",
158            "trace_id >= 'b' AND trace_id < 'c'",
159            "trace_id >= 'c' AND trace_id < 'd'",
160            "trace_id >= 'd' AND trace_id < 'e'",
161            "trace_id >= 'e' AND trace_id < 'f'",
162            "trace_id >= 'f'",
163        ];
164
165        let dialect = GenericDialect {};
166        let results = expr
167            .into_iter()
168            .map(|s| {
169                let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
170                parser.parse_expr().unwrap()
171            })
172            .collect::<Vec<Expr>>();
173
174        assert_eq!(
175            results,
176            partition_rule_for_hexstring("trace_id").unwrap().exprs
177        );
178    }
179
180    fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
181        // Generate the partition rules.
182        let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
183        let upper_bounds = upper_bounds_from_rules(&rules);
184
185        // Collect the number of partitions for each uuid.
186        let mut counts = vec![0usize; test_partition as usize];
187        for _ in 0..test_uuid_num {
188            let uuid = Uuid::new_v4().simple().to_string();
189            let partition = allocate_partition_for_uuid(uuid.as_str(), &upper_bounds);
190            counts[partition] += 1;
191        }
192
193        // Check if the partition distribution is uniform.
194        let expected_ratio = 100.0 / test_partition as f64;
195
196        // tolerance is the allowed deviation from the expected ratio.
197        let tolerance = 100.0 / test_partition as f64 * 0.30;
198
199        // For each partition, its ratio should be as close as possible to the expected ratio.
200        for count in counts {
201            let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
202            if (ratio - expected_ratio).abs() >= tolerance {
203                return false;
204            }
205        }
206
207        true
208    }
209
210    fn upper_bounds_from_rules<'a>(rules: &'a [Expr]) -> Vec<&'a str> {
211        let mut upper_bounds = Vec::with_capacity(rules.len().saturating_sub(1));
212        let mut prev_upper: Option<&'a str> = None;
213
214        for (idx, rule) in rules.iter().enumerate() {
215            let (lower, upper) = extract_rule_bounds(rule);
216            match idx {
217                0 => {
218                    assert!(lower.is_none());
219                    assert!(upper.is_some());
220                }
221                idx if idx == rules.len() - 1 => {
222                    assert_eq!(lower, prev_upper);
223                    assert!(upper.is_none());
224                }
225                _ => {
226                    assert_eq!(lower, prev_upper);
227                    assert!(upper.is_some());
228                }
229            }
230
231            if idx < rules.len() - 1 {
232                upper_bounds.push(upper.unwrap());
233            }
234
235            prev_upper = upper;
236        }
237
238        upper_bounds
239    }
240
241    fn extract_rule_bounds(rule: &Expr) -> (Option<&str>, Option<&str>) {
242        fn extract_single_quoted_string(expr: &Expr) -> Option<&str> {
243            match expr {
244                Expr::Value(ValueWithSpan {
245                    value: Value::SingleQuotedString(value),
246                    ..
247                }) => Some(value.as_str()),
248                _ => None,
249            }
250        }
251
252        match rule {
253            Expr::BinaryOp {
254                op: BinaryOperator::Lt,
255                right,
256                ..
257            } => (None, extract_single_quoted_string(right)),
258            Expr::BinaryOp {
259                op: BinaryOperator::GtEq,
260                right,
261                ..
262            } => (extract_single_quoted_string(right), None),
263            Expr::BinaryOp {
264                op: BinaryOperator::And,
265                left,
266                right,
267            } => {
268                let lower = match left.as_ref() {
269                    Expr::BinaryOp {
270                        op: BinaryOperator::GtEq,
271                        right,
272                        ..
273                    } => extract_single_quoted_string(right),
274                    _ => None,
275                };
276                let upper = match right.as_ref() {
277                    Expr::BinaryOp {
278                        op: BinaryOperator::Lt,
279                        right,
280                        ..
281                    } => extract_single_quoted_string(right),
282                    _ => None,
283                };
284                (lower, upper)
285            }
286            _ => (None, None),
287        }
288    }
289
290    fn allocate_partition_for_uuid(uuid: &str, upper_bounds: &[&str]) -> usize {
291        upper_bounds.partition_point(|upper| uuid >= *upper)
292    }
293
294    #[test]
295    fn test_extract_rule_bounds() {
296        let rules = partition_rules_for_uuid(16, "trace_id").unwrap();
297        let upper_bounds = upper_bounds_from_rules(&rules);
298        assert_eq!(upper_bounds.len(), 15);
299    }
300}