sql/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21/// The default number of partitions for OpenTelemetry traces.
22const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24/// The maximum number of partitions for OpenTelemetry traces.
25const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28    ($col: expr, $left_incl: expr, $right_excl: expr) => {
29        Expr::BinaryOp {
30            op: BinaryOperator::And,
31            left: Box::new(Expr::BinaryOp {
32                op: BinaryOperator::GtEq,
33                left: Box::new($col.clone()),
34                right: Box::new(Expr::Value(Value::SingleQuotedString(
35                    $left_incl.to_string(),
36                ))),
37            }),
38            right: Box::new(Expr::BinaryOp {
39                op: BinaryOperator::Lt,
40                left: Box::new($col.clone()),
41                right: Box::new(Expr::Value(Value::SingleQuotedString(
42                    $right_excl.to_string(),
43                ))),
44            }),
45        }
46    };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50    Ok(Partitions {
51        column_list: vec![Ident::new(ident)],
52        exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53    })
54}
55
56// partition_rules_for_uuid can creates partition rules up to 65536 partitions.
57fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58    ensure!(
59        partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60        InvalidPartitionNumberSnafu { partition_num }
61    );
62
63    let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65    let (total_partitions, hex_length) = {
66        match partition_num {
67            2..=16 => (16, 1),
68            17..=256 => (256, 2),
69            257..=4096 => (4096, 3),
70            4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71            _ => unreachable!(),
72        }
73    };
74
75    let partition_size = total_partitions / partition_num;
76    let remainder = total_partitions % partition_num;
77
78    let mut rules = Vec::new();
79    let mut current_boundary = 0;
80    for i in 0..partition_num {
81        let mut size = partition_size;
82        if i < remainder {
83            size += 1;
84        }
85        let start = current_boundary;
86        let end = current_boundary + size;
87
88        if i == 0 {
89            // Create the leftmost rule, for example: trace_id < '1'.
90            rules.push(Expr::BinaryOp {
91                left: Box::new(ident_expr.clone()),
92                op: BinaryOperator::Lt,
93                right: Box::new(Expr::Value(Value::SingleQuotedString(format!(
94                    "{:0hex_length$x}",
95                    end
96                )))),
97            });
98        } else if i == partition_num - 1 {
99            // Create the rightmost rule, for example: trace_id >= 'f'.
100            rules.push(Expr::BinaryOp {
101                left: Box::new(ident_expr.clone()),
102                op: BinaryOperator::GtEq,
103                right: Box::new(Expr::Value(Value::SingleQuotedString(format!(
104                    "{:0hex_length$x}",
105                    start
106                )))),
107            });
108        } else {
109            // Create the middle rules, for example: trace_id >= '1' AND trace_id < '2'.
110            rules.push(between_string!(
111                ident_expr,
112                format!("{:0hex_length$x}", start),
113                format!("{:0hex_length$x}", end)
114            ));
115        }
116
117        current_boundary = end;
118    }
119
120    Ok(rules)
121}
122
123#[cfg(test)]
124mod tests {
125    use std::collections::HashMap;
126
127    use sqlparser::ast::Expr;
128    use sqlparser::dialect::GenericDialect;
129    use sqlparser::parser::Parser;
130    use uuid::Uuid;
131
132    use super::*;
133
134    #[test]
135    fn test_partition_rules_for_uuid() {
136        // NOTE: We only test a subset of partitions to keep the test execution time reasonable.
137        // As the number of partitions increases, we need to increase the number of test samples to ensure uniform distribution.
138        assert!(check_distribution(2, 10_000)); // 2^1
139        assert!(check_distribution(4, 10_000)); // 2^2
140        assert!(check_distribution(8, 10_000)); // 2^3
141        assert!(check_distribution(16, 10_000)); // 2^4
142        assert!(check_distribution(32, 10_000)); // 2^5
143        assert!(check_distribution(64, 100_000)); // 2^6
144        assert!(check_distribution(128, 100_000)); // 2^7
145        assert!(check_distribution(256, 100_000)); // 2^8
146    }
147
148    #[test]
149    fn test_rules() {
150        let expr = vec![
151            "trace_id < '1'",
152            "trace_id >= '1' AND trace_id < '2'",
153            "trace_id >= '2' AND trace_id < '3'",
154            "trace_id >= '3' AND trace_id < '4'",
155            "trace_id >= '4' AND trace_id < '5'",
156            "trace_id >= '5' AND trace_id < '6'",
157            "trace_id >= '6' AND trace_id < '7'",
158            "trace_id >= '7' AND trace_id < '8'",
159            "trace_id >= '8' AND trace_id < '9'",
160            "trace_id >= '9' AND trace_id < 'a'",
161            "trace_id >= 'a' AND trace_id < 'b'",
162            "trace_id >= 'b' AND trace_id < 'c'",
163            "trace_id >= 'c' AND trace_id < 'd'",
164            "trace_id >= 'd' AND trace_id < 'e'",
165            "trace_id >= 'e' AND trace_id < 'f'",
166            "trace_id >= 'f'",
167        ];
168
169        let dialect = GenericDialect {};
170        let results = expr
171            .into_iter()
172            .map(|s| {
173                let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
174                parser.parse_expr().unwrap()
175            })
176            .collect::<Vec<Expr>>();
177
178        assert_eq!(
179            results,
180            partition_rule_for_hexstring("trace_id").unwrap().exprs
181        );
182    }
183
184    fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
185        // Generate test_uuid_num random uuids.
186        let uuids = (0..test_uuid_num)
187            .map(|_| Uuid::new_v4().to_string().replace("-", "").to_lowercase())
188            .collect::<Vec<String>>();
189
190        // Generate the partition rules.
191        let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
192
193        // Collect the number of partitions for each uuid.
194        let mut stats = HashMap::new();
195        for uuid in uuids {
196            let partition = allocate_partition_for_uuid(uuid.clone(), &rules);
197            // Count the number of uuids in each partition.
198            *stats.entry(partition).or_insert(0) += 1;
199        }
200
201        // Check if the partition distribution is uniform.
202        let expected_ratio = 100.0 / test_partition as f64;
203
204        // tolerance is the allowed deviation from the expected ratio.
205        let tolerance = 100.0 / test_partition as f64 * 0.30;
206
207        // For each partition, its ratio should be as close as possible to the expected ratio.
208        for (_, count) in stats {
209            let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
210            if (ratio - expected_ratio).abs() >= tolerance {
211                return false;
212            }
213        }
214
215        true
216    }
217
218    fn allocate_partition_for_uuid(uuid: String, rules: &[Expr]) -> usize {
219        for (i, rule) in rules.iter().enumerate() {
220            if let Expr::BinaryOp { left, op: _, right } = rule {
221                if i == 0 {
222                    // Hit the leftmost rule.
223                    if let Expr::Value(Value::SingleQuotedString(leftmost)) = *right.clone() {
224                        if uuid < leftmost {
225                            return i;
226                        }
227                    }
228                } else if i == rules.len() - 1 {
229                    // Hit the rightmost rule.
230                    if let Expr::Value(Value::SingleQuotedString(rightmost)) = *right.clone() {
231                        if uuid >= rightmost {
232                            return i;
233                        }
234                    }
235                } else {
236                    // Hit the middle rules.
237                    if let Expr::BinaryOp {
238                        left: _,
239                        op: _,
240                        right: inner_right,
241                    } = *left.clone()
242                    {
243                        if let Expr::Value(Value::SingleQuotedString(lower)) = *inner_right.clone()
244                        {
245                            if let Expr::BinaryOp {
246                                left: _,
247                                op: _,
248                                right: inner_right,
249                            } = *right.clone()
250                            {
251                                if let Expr::Value(Value::SingleQuotedString(upper)) =
252                                    *inner_right.clone()
253                                {
254                                    if uuid >= lower && uuid < upper {
255                                        return i;
256                                    }
257                                }
258                            }
259                        }
260                    }
261                }
262            }
263        }
264
265        panic!("No partition found for uuid: {}, rules: {:?}", uuid, rules);
266    }
267}