1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::{Debug, Display, Formatter};
18use std::sync::Arc;
19
20use common_meta::rpc::router::Partition as MetaPartition;
21use datafusion_expr::Operator;
22use datatypes::arrow::array::{BooleanArray, RecordBatch};
23use datatypes::prelude::Value;
24use itertools::Itertools;
25use serde::{Deserialize, Serialize};
26use snafu::ResultExt;
27use store_api::storage::RegionNumber;
28
29use crate::error::{self, Error, Result};
30
31pub type PartitionRuleRef = Arc<dyn PartitionRule>;
32
33pub trait PartitionRule: Sync + Send {
34 fn as_any(&self) -> &dyn Any;
35
36 fn partition_columns(&self) -> Vec<String>;
37
38 fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
42
43 fn split_record_batch(
46 &self,
47 record_batch: &RecordBatch,
48 ) -> Result<HashMap<RegionNumber, BooleanArray>>;
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub enum PartitionBound {
54 Value(Value),
55 MaxValue,
56 Expr(crate::expr::PartitionExpr),
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct PartitionDef {
61 partition_columns: Vec<String>,
62 partition_bounds: Vec<PartitionBound>,
63}
64
65impl Display for PartitionBound {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::Value(v) => write!(f, "{}", v),
69 Self::MaxValue => write!(f, "MAXVALUE"),
70 Self::Expr(e) => write!(f, "{}", e),
71 }
72 }
73}
74
75impl Display for PartitionDef {
76 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
77 write!(
78 f,
79 "{}",
80 self.partition_bounds
81 .iter()
82 .map(|b| format!("{b}"))
83 .join(", ")
84 )
85 }
86}
87
88impl PartitionDef {
89 pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
90 Self {
91 partition_columns,
92 partition_bounds,
93 }
94 }
95
96 pub fn partition_columns(&self) -> &Vec<String> {
97 &self.partition_columns
98 }
99
100 pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
101 &self.partition_bounds
102 }
103}
104
105impl TryFrom<&MetaPartition> for PartitionDef {
106 type Error = Error;
107
108 fn try_from(partition: &MetaPartition) -> Result<Self> {
109 let MetaPartition {
110 column_list,
111 value_list,
112 } = partition;
113
114 let partition_columns = column_list
115 .iter()
116 .map(|x| String::from_utf8_lossy(x).to_string())
117 .collect::<Vec<String>>();
118
119 let partition_bounds = value_list
120 .iter()
121 .map(|x| serde_json::from_str(&String::from_utf8_lossy(x)))
122 .collect::<std::result::Result<Vec<PartitionBound>, serde_json::Error>>()
123 .context(error::DeserializeJsonSnafu)?;
124
125 Ok(PartitionDef {
126 partition_columns,
127 partition_bounds,
128 })
129 }
130}
131
132impl TryFrom<PartitionDef> for MetaPartition {
133 type Error = Error;
134
135 fn try_from(partition: PartitionDef) -> Result<Self> {
136 let PartitionDef {
137 partition_columns: columns,
138 partition_bounds: bounds,
139 } = partition;
140
141 let column_list = columns
142 .into_iter()
143 .map(|x| x.into_bytes())
144 .collect::<Vec<Vec<u8>>>();
145
146 let value_list = bounds
147 .into_iter()
148 .map(|x| serde_json::to_string(&x).map(|s| s.into_bytes()))
149 .collect::<std::result::Result<Vec<Vec<u8>>, serde_json::Error>>()
150 .context(error::SerializeJsonSnafu)?;
151
152 Ok(MetaPartition {
153 column_list,
154 value_list,
155 })
156 }
157}
158
159#[derive(Debug, PartialEq, Eq)]
160pub struct PartitionExpr {
161 pub column: String,
162 pub op: Operator,
163 pub value: Value,
164}
165
166impl PartitionExpr {
167 pub fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
168 Self {
169 column: column.into(),
170 op,
171 value,
172 }
173 }
174
175 pub fn value(&self) -> &Value {
176 &self.value
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_partition_def() {
186 let def = PartitionDef {
188 partition_columns: vec!["a".to_string(), "b".to_string()],
189 partition_bounds: vec![
190 PartitionBound::MaxValue,
191 PartitionBound::Value(1_i32.into()),
192 ],
193 };
194 assert_eq!("MAXVALUE, 1", def.to_string());
195
196 let partition: MetaPartition = def.try_into().unwrap();
197 assert_eq!(
198 r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#,
199 serde_json::to_string(&partition).unwrap(),
200 );
201
202 let partition = &MetaPartition {
204 column_list: vec![b"a".to_vec(), b"b".to_vec()],
205 value_list: vec![
206 b"\"MaxValue\"".to_vec(),
207 b"{\"Value\":{\"Int32\":1}}".to_vec(),
208 ],
209 };
210 let def: PartitionDef = partition.try_into().unwrap();
211 assert_eq!(
212 def.partition_columns,
213 vec!["a".to_string(), "b".to_string()]
214 );
215 assert_eq!(
216 def.partition_bounds,
217 vec![
218 PartitionBound::MaxValue,
219 PartitionBound::Value(1_i32.into())
220 ]
221 );
222 }
223
224 #[test]
225 fn test_partition_bound() {
226 let b1 = PartitionBound::Value(1_i32.into());
227 let b2 = PartitionBound::Value(100_i32.into());
228 let b3 = PartitionBound::MaxValue;
229 assert!(b1 < b2);
230 assert!(b2 < b3);
231 }
232}