partition/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datafusion_expr::Operator;
22use datatypes::arrow::array::{BooleanArray, RecordBatch};
23use datatypes::prelude::Value;
24use itertools::Itertools;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::RegionNumber;
28
29use crate::error::{self, Error, Result};
30
31pub type PartitionRuleRef = Arc<dyn PartitionRule>;
32
33pub trait PartitionRule: Sync + Send {
34    fn as_any(&self) -> &dyn Any;
35
36    fn partition_columns(&self) -> Vec<String>;
37
38    /// Finds the target region by the partition values.
39    ///
40    /// Note that the `values` should have the same length as the `partition_columns`.
41    fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
42
43    /// Split the record batch into multiple regions by the partition values.
44    /// The result is a map from region mask in which the array is true for the rows that match the partition values.
45    /// Region with now rows selected may not appear in result map.
46    fn split_record_batch(
47        &self,
48        record_batch: &RecordBatch,
49    ) -> Result<HashMap<RegionNumber, RegionMask>>;
50}
51
52/// The right bound(exclusive) of partition range.
53#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
54pub enum PartitionBound {
55    Value(Value),
56    MaxValue,
57    Expr(crate::expr::PartitionExpr),
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct PartitionDef {
62    partition_columns: Vec<String>,
63    partition_bounds: Vec<PartitionBound>,
64}
65
66impl Display for PartitionBound {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Value(v) => write!(f, "{}", v),
70            Self::MaxValue => write!(f, "MAXVALUE"),
71            Self::Expr(e) => write!(f, "{}", e),
72        }
73    }
74}
75
76impl Display for PartitionDef {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        write!(
79            f,
80            "{}",
81            self.partition_bounds
82                .iter()
83                .map(|b| format!("{b}"))
84                .join(", ")
85        )
86    }
87}
88
89impl PartitionDef {
90    pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
91        Self {
92            partition_columns,
93            partition_bounds,
94        }
95    }
96
97    pub fn partition_columns(&self) -> &Vec<String> {
98        &self.partition_columns
99    }
100
101    pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
102        &self.partition_bounds
103    }
104}
105
106impl TryFrom<&MetaPartition> for PartitionDef {
107    type Error = Error;
108
109    fn try_from(partition: &MetaPartition) -> Result<Self> {
110        let MetaPartition {
111            column_list,
112            value_list,
113        } = partition;
114
115        let partition_columns = column_list
116            .iter()
117            .map(|x| String::from_utf8_lossy(x).to_string())
118            .collect::<Vec<String>>();
119
120        let partition_bounds = value_list
121            .iter()
122            .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
123            .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
124            .context(error::DeserializeJsonSnafu)?;
125
126        Ok(PartitionDef {
127            partition_columns,
128            partition_bounds,
129        })
130    }
131}
132
133impl TryFrom<PartitionDef> for MetaPartition {
134    type Error = Error;
135
136    fn try_from(partition: PartitionDef) -> Result<Self> {
137        let PartitionDef {
138            partition_columns: columns,
139            partition_bounds: bounds,
140        } = partition;
141
142        let column_list = columns
143            .into_iter()
144            .map(|x| x.into_bytes())
145            .collect::<Vec<Vec<u8>>>();
146
147        let value_list = bounds
148            .into_iter()
149            .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
150            .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
151            .context(error::SerializeJsonSnafu)?;
152
153        Ok(MetaPartition {
154            column_list,
155            value_list,
156        })
157    }
158}
159
160#[derive(Debug, PartialEq, Eq)]
161pub struct PartitionExpr {
162    pub column: String,
163    pub op: Operator,
164    pub value: Value,
165}
166
167impl PartitionExpr {
168    pub fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
169        Self {
170            column: column.into(),
171            op,
172            value,
173        }
174    }
175
176    pub fn value(&self) -> &Value {
177        &self.value
178    }
179}
180
181pub struct RegionMask {
182    array: BooleanArray,
183    selected_rows: usize,
184}
185
186impl From<BooleanArray> for RegionMask {
187    fn from(array: BooleanArray) -> Self {
188        let selected_rows = array.true_count();
189        Self {
190            array,
191            selected_rows,
192        }
193    }
194}
195
196impl RegionMask {
197    pub fn new(array: BooleanArray, selected_rows: usize) -> Self {
198        Self {
199            array,
200            selected_rows,
201        }
202    }
203
204    pub fn array(&self) -> &BooleanArray {
205        &self.array
206    }
207
208    /// All rows are selected.
209    pub fn select_all(&self) -> bool {
210        self.selected_rows == self.array.len()
211    }
212
213    /// No row is selected.
214    pub fn select_none(&self) -> bool {
215        self.selected_rows == 0
216    }
217
218    pub fn selected_rows(&self) -> usize {
219        self.selected_rows
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn test_partition_def() {
229        // PartitionDef -> MetaPartition
230        let def = PartitionDef {
231            partition_columns: vec!["a".to_string(), "b".to_string()],
232            partition_bounds: vec![
233                PartitionBound::MaxValue,
234                PartitionBound::Value(1_i32.into()),
235            ],
236        };
237        assert_eq!("MAXVALUE, 1", def.to_string());
238
239        let partition: MetaPartition = def.try_into().unwrap();
240        assert_eq!(
241            r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
242            serde_json::to_string(&partition).unwrap(),
243        );
244
245        // MetaPartition -> PartitionDef
246        let partition = &MetaPartition {
247            column_list: vec![b"a".to_vec(), b"b".to_vec()],
248            value_list: vec![
249                b"\"MaxValue\"".to_vec(),
250                b"{\"Value\":{\"Int32\":1}}".to_vec(),
251            ],
252        };
253        let def: PartitionDef = partition.try_into().unwrap();
254        assert_eq!(
255            def.partition_columns,
256            vec!["a".to_string(), "b".to_string()]
257        );
258        assert_eq!(
259            def.partition_bounds,
260            vec![
261                PartitionBound::MaxValue,
262                PartitionBound::Value(1_i32.into())
263            ]
264        );
265    }
266
267    #[test]
268    fn test_partition_bound() {
269        let b1 = PartitionBound::Value(1_i32.into());
270        let b2 = PartitionBound::Value(100_i32.into());
271        let b3 = PartitionBound::MaxValue;
272        assert!(b1 < b2);
273        assert!(b2 < b3);
274    }
275}