1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datatypes::arrow::array::{BooleanArray, RecordBatch};
22use datatypes::prelude::Value;
23use itertools::Itertools;
24use serde::{Deserialize, Serialize};
25use snafu::ResultExt;
26use store_api::storage::RegionNumber;
27
28use crate::error::{self, Error, Result};
29
30pub type PartitionRuleRef = Arc<dyn PartitionRule>;
31
32pub trait PartitionRule: Sync + Send {
33 fn as_any(&self) -> &dyn Any;
34
35 fn partition_columns(&self) -> Vec<String>;
36
37 fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
41
42 fn split_record_batch(
46 &self,
47 record_batch: &RecordBatch,
48 ) -> Result<HashMap<RegionNumber, RegionMask>>;
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub enum PartitionBound {
54 Value(Value),
55 MaxValue,
56 Expr(crate::expr::PartitionExpr),
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PartitionDef {
61 partition_columns: Vec<String>,
62 partition_bounds: Vec<PartitionBound>,
63}
64
65impl Display for PartitionBound {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Value(v) => write!(f, "{}", v),
69 Self::MaxValue => write!(f, "MAXVALUE"),
70 Self::Expr(e) => write!(f, "{}", e),
71 }
72 }
73}
74
75impl Display for PartitionDef {
76 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77 write!(
78 f,
79 "{}",
80 self.partition_bounds
81 .iter()
82 .map(|b| format!("{b}"))
83 .join(", ")
84 )
85 }
86}
87
88impl PartitionDef {
89 pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
90 Self {
91 partition_columns,
92 partition_bounds,
93 }
94 }
95
96 pub fn partition_columns(&self) -> &Vec<String> {
97 &self.partition_columns
98 }
99
100 pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
101 &self.partition_bounds
102 }
103}
104
105impl TryFrom<&MetaPartition> for PartitionDef {
106 type Error = Error;
107
108 fn try_from(partition: &MetaPartition) -> Result<Self> {
109 let MetaPartition {
110 column_list,
111 value_list,
112 } = partition;
113
114 let partition_columns = column_list
115 .iter()
116 .map(|x| String::from_utf8_lossy(x).to_string())
117 .collect::<Vec<String>>();
118
119 let partition_bounds = value_list
120 .iter()
121 .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
122 .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
123 .context(error::DeserializeJsonSnafu)?;
124
125 Ok(PartitionDef {
126 partition_columns,
127 partition_bounds,
128 })
129 }
130}
131
132impl TryFrom<PartitionDef> for MetaPartition {
133 type Error = Error;
134
135 fn try_from(partition: PartitionDef) -> Result<Self> {
136 let PartitionDef {
137 partition_columns: columns,
138 partition_bounds: bounds,
139 } = partition;
140
141 let column_list = columns
142 .into_iter()
143 .map(|x| x.into_bytes())
144 .collect::<Vec<Vec<u8>>>();
145
146 let value_list = bounds
147 .into_iter()
148 .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
149 .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
150 .context(error::SerializeJsonSnafu)?;
151
152 Ok(MetaPartition {
153 column_list,
154 value_list,
155 })
156 }
157}
158
159pub struct RegionMask {
160 array: BooleanArray,
161 selected_rows: usize,
162}
163
164impl From<BooleanArray> for RegionMask {
165 fn from(array: BooleanArray) -> Self {
166 let selected_rows = array.true_count();
167 Self {
168 array,
169 selected_rows,
170 }
171 }
172}
173
174impl RegionMask {
175 pub fn new(array: BooleanArray, selected_rows: usize) -> Self {
176 Self {
177 array,
178 selected_rows,
179 }
180 }
181
182 pub fn array(&self) -> &BooleanArray {
183 &self.array
184 }
185
186 pub fn select_all(&self) -> bool {
188 self.selected_rows == self.array.len()
189 }
190
191 pub fn select_none(&self) -> bool {
193 self.selected_rows == 0
194 }
195
196 pub fn selected_rows(&self) -> usize {
197 self.selected_rows
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_partition_def() {
207 let def = PartitionDef {
209 partition_columns: vec!["a".to_string(), "b".to_string()],
210 partition_bounds: vec![
211 PartitionBound::MaxValue,
212 PartitionBound::Value(1_i32.into()),
213 ],
214 };
215 assert_eq!("MAXVALUE, 1", def.to_string());
216
217 let partition: MetaPartition = def.try_into().unwrap();
218 assert_eq!(
219 r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
220 serde_json::to_string(&partition).unwrap(),
221 );
222
223 let partition = &MetaPartition {
225 column_list: vec![b"a".to_vec(), b"b".to_vec()],
226 value_list: vec![
227 b"\"MaxValue\"".to_vec(),
228 b"{\"Value\":{\"Int32\":1}}".to_vec(),
229 ],
230 };
231 let def: PartitionDef = partition.try_into().unwrap();
232 assert_eq!(
233 def.partition_columns,
234 vec!["a".to_string(), "b".to_string()]
235 );
236 assert_eq!(
237 def.partition_bounds,
238 vec![
239 PartitionBound::MaxValue,
240 PartitionBound::Value(1_i32.into())
241 ]
242 );
243 }
244
245 #[test]
246 fn test_partition_bound() {
247 let b1 = PartitionBound::Value(1_i32.into());
248 let b2 = PartitionBound::Value(100_i32.into());
249 let b3 = PartitionBound::MaxValue;
250 assert!(b1 < b2);
251 assert!(b2 < b3);
252 }
253}