1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datafusion_expr::Operator;
22use datatypes::arrow::array::{BooleanArray, RecordBatch};
23use datatypes::prelude::Value;
24use itertools::Itertools;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::RegionNumber;
28
29use crate::error::{self, Error, Result};
30
31pub type PartitionRuleRef = Arc<dyn PartitionRule>;
32
33pub trait PartitionRule: Sync + Send {
34 fn as_any(&self) -> &dyn Any;
35
36 fn partition_columns(&self) -> Vec<String>;
37
38 fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
42
43 fn split_record_batch(
47 &self,
48 record_batch: &RecordBatch,
49 ) -> Result<HashMap<RegionNumber, RegionMask>>;
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
54pub enum PartitionBound {
55 Value(Value),
56 MaxValue,
57 Expr(crate::expr::PartitionExpr),
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct PartitionDef {
62 partition_columns: Vec<String>,
63 partition_bounds: Vec<PartitionBound>,
64}
65
66impl Display for PartitionBound {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Value(v) => write!(f, "{}", v),
70 Self::MaxValue => write!(f, "MAXVALUE"),
71 Self::Expr(e) => write!(f, "{}", e),
72 }
73 }
74}
75
76impl Display for PartitionDef {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "{}",
81 self.partition_bounds
82 .iter()
83 .map(|b| format!("{b}"))
84 .join(", ")
85 )
86 }
87}
88
89impl PartitionDef {
90 pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
91 Self {
92 partition_columns,
93 partition_bounds,
94 }
95 }
96
97 pub fn partition_columns(&self) -> &Vec<String> {
98 &self.partition_columns
99 }
100
101 pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
102 &self.partition_bounds
103 }
104}
105
106impl TryFrom<&MetaPartition> for PartitionDef {
107 type Error = Error;
108
109 fn try_from(partition: &MetaPartition) -> Result<Self> {
110 let MetaPartition {
111 column_list,
112 value_list,
113 } = partition;
114
115 let partition_columns = column_list
116 .iter()
117 .map(|x| String::from_utf8_lossy(x).to_string())
118 .collect::<Vec<String>>();
119
120 let partition_bounds = value_list
121 .iter()
122 .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
123 .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
124 .context(error::DeserializeJsonSnafu)?;
125
126 Ok(PartitionDef {
127 partition_columns,
128 partition_bounds,
129 })
130 }
131}
132
133impl TryFrom<PartitionDef> for MetaPartition {
134 type Error = Error;
135
136 fn try_from(partition: PartitionDef) -> Result<Self> {
137 let PartitionDef {
138 partition_columns: columns,
139 partition_bounds: bounds,
140 } = partition;
141
142 let column_list = columns
143 .into_iter()
144 .map(|x| x.into_bytes())
145 .collect::<Vec<Vec<u8>>>();
146
147 let value_list = bounds
148 .into_iter()
149 .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
150 .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
151 .context(error::SerializeJsonSnafu)?;
152
153 Ok(MetaPartition {
154 column_list,
155 value_list,
156 })
157 }
158}
159
160#[derive(Debug, PartialEq, Eq)]
161pub struct PartitionExpr {
162 pub column: String,
163 pub op: Operator,
164 pub value: Value,
165}
166
167impl PartitionExpr {
168 pub fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
169 Self {
170 column: column.into(),
171 op,
172 value,
173 }
174 }
175
176 pub fn value(&self) -> &Value {
177 &self.value
178 }
179}
180
181pub struct RegionMask {
182 array: BooleanArray,
183 selected_rows: usize,
184}
185
186impl From<BooleanArray> for RegionMask {
187 fn from(array: BooleanArray) -> Self {
188 let selected_rows = array.true_count();
189 Self {
190 array,
191 selected_rows,
192 }
193 }
194}
195
196impl RegionMask {
197 pub fn new(array: BooleanArray, selected_rows: usize) -> Self {
198 Self {
199 array,
200 selected_rows,
201 }
202 }
203
204 pub fn array(&self) -> &BooleanArray {
205 &self.array
206 }
207
208 pub fn select_all(&self) -> bool {
210 self.selected_rows == self.array.len()
211 }
212
213 pub fn select_none(&self) -> bool {
215 self.selected_rows == 0
216 }
217
218 pub fn selected_rows(&self) -> usize {
219 self.selected_rows
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn test_partition_def() {
229 let def = PartitionDef {
231 partition_columns: vec!["a".to_string(), "b".to_string()],
232 partition_bounds: vec![
233 PartitionBound::MaxValue,
234 PartitionBound::Value(1_i32.into()),
235 ],
236 };
237 assert_eq!("MAXVALUE, 1", def.to_string());
238
239 let partition: MetaPartition = def.try_into().unwrap();
240 assert_eq!(
241 r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
242 serde_json::to_string(&partition).unwrap(),
243 );
244
245 let partition = &MetaPartition {
247 column_list: vec![b"a".to_vec(), b"b".to_vec()],
248 value_list: vec![
249 b"\"MaxValue\"".to_vec(),
250 b"{\"Value\":{\"Int32\":1}}".to_vec(),
251 ],
252 };
253 let def: PartitionDef = partition.try_into().unwrap();
254 assert_eq!(
255 def.partition_columns,
256 vec!["a".to_string(), "b".to_string()]
257 );
258 assert_eq!(
259 def.partition_bounds,
260 vec![
261 PartitionBound::MaxValue,
262 PartitionBound::Value(1_i32.into())
263 ]
264 );
265 }
266
267 #[test]
268 fn test_partition_bound() {
269 let b1 = PartitionBound::Value(1_i32.into());
270 let b2 = PartitionBound::Value(100_i32.into());
271 let b3 = PartitionBound::MaxValue;
272 assert!(b1 < b2);
273 assert!(b2 < b3);
274 }
275}