sql/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21/// The default number of partitions for OpenTelemetry traces.
22const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24/// The maximum number of partitions for OpenTelemetry traces.
25const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28    ($col: expr, $left_incl: expr, $right_excl: expr) => {
29        Expr::BinaryOp {
30            op: BinaryOperator::And,
31            left: Box::new(Expr::BinaryOp {
32                op: BinaryOperator::GtEq,
33                left: Box::new($col.clone()),
34                right: Box::new(Expr::Value(
35                    Value::SingleQuotedString($left_incl.to_string()).into(),
36                )),
37            }),
38            right: Box::new(Expr::BinaryOp {
39                op: BinaryOperator::Lt,
40                left: Box::new($col.clone()),
41                right: Box::new(Expr::Value(
42                    Value::SingleQuotedString($right_excl.to_string()).into(),
43                )),
44            }),
45        }
46    };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
50    Ok(Partitions {
51        column_list: vec![Ident::new(ident)],
52        exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
53    })
54}
55
56// partition_rules_for_uuid can creates partition rules up to 65536 partitions.
57fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
58    ensure!(
59        partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
60        InvalidPartitionNumberSnafu { partition_num }
61    );
62
63    let ident_expr = Expr::Identifier(Ident::new(ident).clone());
64
65    let (total_partitions, hex_length) = {
66        match partition_num {
67            2..=16 => (16, 1),
68            17..=256 => (256, 2),
69            257..=4096 => (4096, 3),
70            4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
71            _ => unreachable!(),
72        }
73    };
74
75    let partition_size = total_partitions / partition_num;
76    let remainder = total_partitions % partition_num;
77
78    let mut rules = Vec::new();
79    let mut current_boundary = 0;
80    for i in 0..partition_num {
81        let mut size = partition_size;
82        if i < remainder {
83            size += 1;
84        }
85        let start = current_boundary;
86        let end = current_boundary + size;
87
88        if i == 0 {
89            // Create the leftmost rule, for example: trace_id < '1'.
90            rules.push(Expr::BinaryOp {
91                left: Box::new(ident_expr.clone()),
92                op: BinaryOperator::Lt,
93                right: Box::new(Expr::Value(
94                    Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
95                )),
96            });
97        } else if i == partition_num - 1 {
98            // Create the rightmost rule, for example: trace_id >= 'f'.
99            rules.push(Expr::BinaryOp {
100                left: Box::new(ident_expr.clone()),
101                op: BinaryOperator::GtEq,
102                right: Box::new(Expr::Value(
103                    Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
104                )),
105            });
106        } else {
107            // Create the middle rules, for example: trace_id >= '1' AND trace_id < '2'.
108            rules.push(between_string!(
109                ident_expr,
110                format!("{:0hex_length$x}", start),
111                format!("{:0hex_length$x}", end)
112            ));
113        }
114
115        current_boundary = end;
116    }
117
118    Ok(rules)
119}
120
121#[cfg(test)]
122mod tests {
123    use std::collections::HashMap;
124
125    use sqlparser::ast::{Expr, ValueWithSpan};
126    use sqlparser::dialect::GenericDialect;
127    use sqlparser::parser::Parser;
128    use uuid::Uuid;
129
130    use super::*;
131
132    #[test]
133    fn test_partition_rules_for_uuid() {
134        // NOTE: We only test a subset of partitions to keep the test execution time reasonable.
135        // As the number of partitions increases, we need to increase the number of test samples to ensure uniform distribution.
136        assert!(check_distribution(2, 10_000)); // 2^1
137        assert!(check_distribution(4, 10_000)); // 2^2
138        assert!(check_distribution(8, 10_000)); // 2^3
139        assert!(check_distribution(16, 10_000)); // 2^4
140        assert!(check_distribution(32, 10_000)); // 2^5
141        assert!(check_distribution(64, 100_000)); // 2^6
142        assert!(check_distribution(128, 100_000)); // 2^7
143        assert!(check_distribution(256, 100_000)); // 2^8
144    }
145
146    #[test]
147    fn test_rules() {
148        let expr = vec![
149            "trace_id < '1'",
150            "trace_id >= '1' AND trace_id < '2'",
151            "trace_id >= '2' AND trace_id < '3'",
152            "trace_id >= '3' AND trace_id < '4'",
153            "trace_id >= '4' AND trace_id < '5'",
154            "trace_id >= '5' AND trace_id < '6'",
155            "trace_id >= '6' AND trace_id < '7'",
156            "trace_id >= '7' AND trace_id < '8'",
157            "trace_id >= '8' AND trace_id < '9'",
158            "trace_id >= '9' AND trace_id < 'a'",
159            "trace_id >= 'a' AND trace_id < 'b'",
160            "trace_id >= 'b' AND trace_id < 'c'",
161            "trace_id >= 'c' AND trace_id < 'd'",
162            "trace_id >= 'd' AND trace_id < 'e'",
163            "trace_id >= 'e' AND trace_id < 'f'",
164            "trace_id >= 'f'",
165        ];
166
167        let dialect = GenericDialect {};
168        let results = expr
169            .into_iter()
170            .map(|s| {
171                let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
172                parser.parse_expr().unwrap()
173            })
174            .collect::<Vec<Expr>>();
175
176        assert_eq!(
177            results,
178            partition_rule_for_hexstring("trace_id").unwrap().exprs
179        );
180    }
181
182    fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
183        // Generate test_uuid_num random uuids.
184        let uuids = (0..test_uuid_num)
185            .map(|_| Uuid::new_v4().to_string().replace("-", "").to_lowercase())
186            .collect::<Vec<String>>();
187
188        // Generate the partition rules.
189        let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
190
191        // Collect the number of partitions for each uuid.
192        let mut stats = HashMap::new();
193        for uuid in uuids {
194            let partition = allocate_partition_for_uuid(uuid.clone(), &rules);
195            // Count the number of uuids in each partition.
196            *stats.entry(partition).or_insert(0) += 1;
197        }
198
199        // Check if the partition distribution is uniform.
200        let expected_ratio = 100.0 / test_partition as f64;
201
202        // tolerance is the allowed deviation from the expected ratio.
203        let tolerance = 100.0 / test_partition as f64 * 0.30;
204
205        // For each partition, its ratio should be as close as possible to the expected ratio.
206        for (_, count) in stats {
207            let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
208            if (ratio - expected_ratio).abs() >= tolerance {
209                return false;
210            }
211        }
212
213        true
214    }
215
216    fn allocate_partition_for_uuid(uuid: String, rules: &[Expr]) -> usize {
217        for (i, rule) in rules.iter().enumerate() {
218            if let Expr::BinaryOp { left, op: _, right } = rule {
219                if i == 0 {
220                    // Hit the leftmost rule.
221                    if let Expr::Value(ValueWithSpan {
222                        value: Value::SingleQuotedString(leftmost),
223                        ..
224                    }) = *right.clone()
225                    {
226                        if uuid < leftmost {
227                            return i;
228                        }
229                    }
230                } else if i == rules.len() - 1 {
231                    // Hit the rightmost rule.
232                    if let Expr::Value(ValueWithSpan {
233                        value: Value::SingleQuotedString(rightmost),
234                        ..
235                    }) = *right.clone()
236                    {
237                        if uuid >= rightmost {
238                            return i;
239                        }
240                    }
241                } else {
242                    // Hit the middle rules.
243                    if let Expr::BinaryOp {
244                        left: _,
245                        op: _,
246                        right: inner_right,
247                    } = *left.clone()
248                    {
249                        if let Expr::Value(ValueWithSpan {
250                            value: Value::SingleQuotedString(lower),
251                            ..
252                        }) = *inner_right.clone()
253                        {
254                            if let Expr::BinaryOp {
255                                left: _,
256                                op: _,
257                                right: inner_right,
258                            } = *right.clone()
259                            {
260                                if let Expr::Value(ValueWithSpan {
261                                    value: Value::SingleQuotedString(upper),
262                                    ..
263                                }) = *inner_right.clone()
264                                {
265                                    if uuid >= lower && uuid < upper {
266                                        return i;
267                                    }
268                                }
269                            }
270                        }
271                    }
272                }
273            }
274        }
275
276        panic!("No partition found for uuid: {}, rules: {:?}", uuid, rules);
277    }
278}