partition/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datafusion_expr::Operator;
22use datatypes::arrow::array::{BooleanArray, RecordBatch};
23use datatypes::prelude::Value;
24use itertools::Itertools;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::RegionNumber;
28
29use crate::error::{self, Error, Result};
30
31pub type PartitionRuleRef = Arc<dyn PartitionRule>;
32
33pub trait PartitionRule: Sync + Send {
34    fn as_any(&self) -> &dyn Any;
35
36    fn partition_columns(&self) -> Vec<String>;
37
38    /// Finds the target region by the partition values.
39    ///
40    /// Note that the `values` should have the same length as the `partition_columns`.
41    fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
42
43    /// Split the record batch into multiple regions by the partition values.
44    /// The result is a map from region number to a boolean array, where the boolean array is true for the rows that match the partition values.
45    fn split_record_batch(
46        &self,
47        record_batch: &RecordBatch,
48    ) -> Result<HashMap<RegionNumber, BooleanArray>>;
49}
50
51/// The right bound(exclusive) of partition range.
52#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub enum PartitionBound {
54    Value(Value),
55    MaxValue,
56    Expr(crate::expr::PartitionExpr),
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PartitionDef {
61    partition_columns: Vec<String>,
62    partition_bounds: Vec<PartitionBound>,
63}
64
65impl Display for PartitionBound {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Value(v) => write!(f, "{}", v),
69            Self::MaxValue => write!(f, "MAXVALUE"),
70            Self::Expr(e) => write!(f, "{}", e),
71        }
72    }
73}
74
75impl Display for PartitionDef {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        write!(
78            f,
79            "{}",
80            self.partition_bounds
81                .iter()
82                .map(|b| format!("{b}"))
83                .join(", ")
84        )
85    }
86}
87
88impl PartitionDef {
89    pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
90        Self {
91            partition_columns,
92            partition_bounds,
93        }
94    }
95
96    pub fn partition_columns(&self) -> &Vec<String> {
97        &self.partition_columns
98    }
99
100    pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
101        &self.partition_bounds
102    }
103}
104
105impl TryFrom<&MetaPartition> for PartitionDef {
106    type Error = Error;
107
108    fn try_from(partition: &MetaPartition) -> Result<Self> {
109        let MetaPartition {
110            column_list,
111            value_list,
112        } = partition;
113
114        let partition_columns = column_list
115            .iter()
116            .map(|x| String::from_utf8_lossy(x).to_string())
117            .collect::<Vec<String>>();
118
119        let partition_bounds = value_list
120            .iter()
121            .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
122            .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
123            .context(error::DeserializeJsonSnafu)?;
124
125        Ok(PartitionDef {
126            partition_columns,
127            partition_bounds,
128        })
129    }
130}
131
132impl TryFrom<PartitionDef> for MetaPartition {
133    type Error = Error;
134
135    fn try_from(partition: PartitionDef) -> Result<Self> {
136        let PartitionDef {
137            partition_columns: columns,
138            partition_bounds: bounds,
139        } = partition;
140
141        let column_list = columns
142            .into_iter()
143            .map(|x| x.into_bytes())
144            .collect::<Vec<Vec<u8>>>();
145
146        let value_list = bounds
147            .into_iter()
148            .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
149            .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
150            .context(error::SerializeJsonSnafu)?;
151
152        Ok(MetaPartition {
153            column_list,
154            value_list,
155        })
156    }
157}
158
159#[derive(Debug, PartialEq, Eq)]
160pub struct PartitionExpr {
161    pub column: String,
162    pub op: Operator,
163    pub value: Value,
164}
165
166impl PartitionExpr {
167    pub fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
168        Self {
169            column: column.into(),
170            op,
171            value,
172        }
173    }
174
175    pub fn value(&self) -> &Value {
176        &self.value
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_partition_def() {
186        // PartitionDef -> MetaPartition
187        let def = PartitionDef {
188            partition_columns: vec!["a".to_string(), "b".to_string()],
189            partition_bounds: vec![
190                PartitionBound::MaxValue,
191                PartitionBound::Value(1_i32.into()),
192            ],
193        };
194        assert_eq!("MAXVALUE, 1", def.to_string());
195
196        let partition: MetaPartition = def.try_into().unwrap();
197        assert_eq!(
198            r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
199            serde_json::to_string(&partition).unwrap(),
200        );
201
202        // MetaPartition -> PartitionDef
203        let partition = &MetaPartition {
204            column_list: vec![b"a".to_vec(), b"b".to_vec()],
205            value_list: vec![
206                b"\"MaxValue\"".to_vec(),
207                b"{\"Value\":{\"Int32\":1}}".to_vec(),
208            ],
209        };
210        let def: PartitionDef = partition.try_into().unwrap();
211        assert_eq!(
212            def.partition_columns,
213            vec!["a".to_string(), "b".to_string()]
214        );
215        assert_eq!(
216            def.partition_bounds,
217            vec![
218                PartitionBound::MaxValue,
219                PartitionBound::Value(1_i32.into())
220            ]
221        );
222    }
223
224    #[test]
225    fn test_partition_bound() {
226        let b1 = PartitionBound::Value(1_i32.into());
227        let b2 = PartitionBound::Value(100_i32.into());
228        let b3 = PartitionBound::MaxValue;
229        assert!(b1 < b2);
230        assert!(b2 < b3);
231    }
232}