partition/
partition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datatypes::arrow::array::{BooleanArray, RecordBatch};
22use datatypes::prelude::Value;
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use snafu::ResultExt;
26use store_api::storage::RegionNumber;
27
28use crate::error::{self, Error, Result};
29
30pub type PartitionRuleRef = Arc<dyn PartitionRule>;
31
32pub trait PartitionRule: Sync + Send {
33    fn as_any(&self) -> &dyn Any;
34
35    fn partition_columns(&self) -> Vec<String>;
36
37    /// Finds the target region by the partition values.
38    ///
39    /// Note that the `values` should have the same length as the `partition_columns`.
40    fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
41
42    /// Split the record batch into multiple regions by the partition values.
43    /// The result is a map from region mask in which the array is true for the rows that match the partition values.
44    /// Region with now rows selected may not appear in result map.
45    fn split_record_batch(
46        &self,
47        record_batch: &RecordBatch,
48    ) -> Result<HashMap<RegionNumber, RegionMask>>;
49}
50
51/// The right bound(exclusive) of partition range.
52#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub enum PartitionBound {
54    Value(Value),
55    MaxValue,
56    Expr(crate::expr::PartitionExpr),
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PartitionDef {
61    partition_columns: Vec<String>,
62    partition_bounds: Vec<PartitionBound>,
63}
64
65impl Display for PartitionBound {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::Value(v) => write!(f, "{}", v),
69            Self::MaxValue => write!(f, "MAXVALUE"),
70            Self::Expr(e) => write!(f, "{}", e),
71        }
72    }
73}
74
75impl Display for PartitionDef {
76    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77        write!(
78            f,
79            "{}",
80            self.partition_bounds
81                .iter()
82                .map(|b| format!("{b}"))
83                .join(", ")
84        )
85    }
86}
87
88impl PartitionDef {
89    pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
90        Self {
91            partition_columns,
92            partition_bounds,
93        }
94    }
95
96    pub fn partition_columns(&self) -> &Vec<String> {
97        &self.partition_columns
98    }
99
100    pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
101        &self.partition_bounds
102    }
103}
104
105impl TryFrom<&MetaPartition> for PartitionDef {
106    type Error = Error;
107
108    fn try_from(partition: &MetaPartition) -> Result<Self> {
109        let MetaPartition {
110            column_list,
111            value_list,
112        } = partition;
113
114        let partition_columns = column_list
115            .iter()
116            .map(|x| String::from_utf8_lossy(x).to_string())
117            .collect::<Vec<String>>();
118
119        let partition_bounds = value_list
120            .iter()
121            .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
122            .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
123            .context(error::DeserializeJsonSnafu)?;
124
125        Ok(PartitionDef {
126            partition_columns,
127            partition_bounds,
128        })
129    }
130}
131
132impl TryFrom<PartitionDef> for MetaPartition {
133    type Error = Error;
134
135    fn try_from(partition: PartitionDef) -> Result<Self> {
136        let PartitionDef {
137            partition_columns: columns,
138            partition_bounds: bounds,
139        } = partition;
140
141        let column_list = columns
142            .into_iter()
143            .map(|x| x.into_bytes())
144            .collect::<Vec<Vec<u8>>>();
145
146        let value_list = bounds
147            .into_iter()
148            .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
149            .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
150            .context(error::SerializeJsonSnafu)?;
151
152        Ok(MetaPartition {
153            column_list,
154            value_list,
155        })
156    }
157}
158
159pub struct RegionMask {
160    array: BooleanArray,
161    selected_rows: usize,
162}
163
164impl From<BooleanArray> for RegionMask {
165    fn from(array: BooleanArray) -> Self {
166        let selected_rows = array.true_count();
167        Self {
168            array,
169            selected_rows,
170        }
171    }
172}
173
174impl RegionMask {
175    pub fn new(array: BooleanArray, selected_rows: usize) -> Self {
176        Self {
177            array,
178            selected_rows,
179        }
180    }
181
182    pub fn array(&self) -> &BooleanArray {
183        &self.array
184    }
185
186    /// All rows are selected.
187    pub fn select_all(&self) -> bool {
188        self.selected_rows == self.array.len()
189    }
190
191    /// No row is selected.
192    pub fn select_none(&self) -> bool {
193        self.selected_rows == 0
194    }
195
196    pub fn selected_rows(&self) -> usize {
197        self.selected_rows
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_partition_def() {
207        // PartitionDef -> MetaPartition
208        let def = PartitionDef {
209            partition_columns: vec!["a".to_string(), "b".to_string()],
210            partition_bounds: vec![
211                PartitionBound::MaxValue,
212                PartitionBound::Value(1_i32.into()),
213            ],
214        };
215        assert_eq!("MAXVALUE, 1", def.to_string());
216
217        let partition: MetaPartition = def.try_into().unwrap();
218        assert_eq!(
219            r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
220            serde_json::to_string(&partition).unwrap(),
221        );
222
223        // MetaPartition -> PartitionDef
224        let partition = &MetaPartition {
225            column_list: vec![b"a".to_vec(), b"b".to_vec()],
226            value_list: vec![
227                b"\"MaxValue\"".to_vec(),
228                b"{\"Value\":{\"Int32\":1}}".to_vec(),
229            ],
230        };
231        let def: PartitionDef = partition.try_into().unwrap();
232        assert_eq!(
233            def.partition_columns,
234            vec!["a".to_string(), "b".to_string()]
235        );
236        assert_eq!(
237            def.partition_bounds,
238            vec![
239                PartitionBound::MaxValue,
240                PartitionBound::Value(1_i32.into())
241            ]
242        );
243    }
244
245    #[test]
246    fn test_partition_bound() {
247        let b1 = PartitionBound::Value(1_i32.into());
248        let b2 = PartitionBound::Value(100_i32.into());
249        let b3 = PartitionBound::MaxValue;
250        assert!(b1 < b2);
251        assert!(b2 < b3);
252    }
253}