1use std::sync::Arc;
16
17use common_query::AddColumnLocation;
18use datatypes::types::cast;
19use rand::Rng;
20use snafu::{OptionExt, ensure};
21
22use crate::error::{self, Result};
23use crate::generator::Random;
24use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption};
25use crate::ir::create_expr::{ColumnOption, PartitionDef};
26use crate::ir::partition_expr::SimplePartitions;
27use crate::ir::repartition_expr::RepartitionExpr;
28use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
29
30pub type TableContextRef = Arc<TableContext>;
31
32#[derive(Debug, Clone)]
34pub struct TableContext {
35 pub name: Ident,
36 pub columns: Vec<Column>,
37
38 pub partition: Option<PartitionDef>,
40 pub primary_keys: Vec<usize>,
41 pub table_options: Vec<AlterTableOption>,
42}
43
44impl From<&CreateTableExpr> for TableContext {
45 fn from(
46 CreateTableExpr {
47 table_name: name,
48 columns,
49 partition,
50 primary_keys,
51 ..
52 }: &CreateTableExpr,
53 ) -> Self {
54 Self {
55 name: name.clone(),
56 columns: columns.clone(),
57 partition: partition.clone(),
58 primary_keys: primary_keys.clone(),
59 table_options: vec![],
60 }
61 }
62}
63
64impl TableContext {
65 pub fn timestamp_column(&self) -> Option<Column> {
67 self.columns.iter().find(|c| c.is_time_index()).cloned()
68 }
69
70 pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
72 match expr.alter_kinds {
73 AlterTableOperation::AddColumn { column, location } => {
74 ensure!(
75 !self.columns.iter().any(|col| col.name == column.name),
76 error::UnexpectedSnafu {
77 violated: format!("Column {} exists", column.name),
78 }
79 );
80 match location {
81 Some(AddColumnLocation::First) => {
82 let mut columns = Vec::with_capacity(self.columns.len() + 1);
83 columns.push(column);
84 columns.extend(self.columns);
85 self.columns = columns;
86 }
87 Some(AddColumnLocation::After { column_name }) => {
88 let index = self
89 .columns
90 .iter()
91 .position(|col| col.name.to_string() == column_name)
93 .context(error::UnexpectedSnafu {
94 violated: format!("Column: {column_name} not found"),
95 })?;
96 self.columns.insert(index + 1, column);
97 }
98 None => self.columns.push(column),
99 }
100 self.primary_keys = self
102 .columns
103 .iter()
104 .enumerate()
105 .flat_map(|(idx, col)| {
106 if col.is_primary_key() {
107 Some(idx)
108 } else {
109 None
110 }
111 })
112 .collect();
113 Ok(self)
114 }
115 AlterTableOperation::DropColumn { name } => {
116 self.columns.retain(|col| col.name != name);
117 self.primary_keys = self
119 .columns
120 .iter()
121 .enumerate()
122 .flat_map(|(idx, col)| {
123 if col.is_primary_key() {
124 Some(idx)
125 } else {
126 None
127 }
128 })
129 .collect();
130 Ok(self)
131 }
132 AlterTableOperation::RenameTable { new_table_name } => {
133 ensure!(
134 new_table_name != self.name,
135 error::UnexpectedSnafu {
136 violated: "The new table name is equal the current name",
137 }
138 );
139 self.name = new_table_name;
140 Ok(self)
141 }
142 AlterTableOperation::ModifyDataType { column } => {
143 if let Some(idx) = self.columns.iter().position(|col| col.name == column.name) {
144 self.columns[idx].column_type = column.column_type.clone();
145 for opt in self.columns[idx].options.iter_mut() {
146 if let ColumnOption::DefaultValue(value) = opt {
147 *value = cast(value.clone(), &column.column_type).unwrap();
148 }
149 }
150 }
151 Ok(self)
152 }
153 AlterTableOperation::SetTableOptions { options } => {
154 for option in options {
155 if let Some(idx) = self
156 .table_options
157 .iter()
158 .position(|opt| opt.key() == option.key())
159 {
160 self.table_options[idx] = option;
161 } else {
162 self.table_options.push(option);
163 }
164 }
165 Ok(self)
166 }
167 AlterTableOperation::UnsetTableOptions { keys } => {
168 self.table_options
169 .retain(|opt| !keys.contains(&opt.key().to_string()));
170 Ok(self)
171 }
172 }
173 }
174
175 pub fn repartition(mut self, expr: RepartitionExpr) -> Result<TableContext> {
176 match expr {
177 RepartitionExpr::Split(split) => {
178 let partition_def = self.partition.as_mut().expect("expected partition def");
179 let insert_pos = partition_def
180 .exprs
181 .iter()
182 .position(|expr| expr == &split.target)
183 .unwrap();
184 partition_def.exprs[insert_pos] = split.into[0].clone();
185 partition_def
186 .exprs
187 .insert(insert_pos + 1, split.into[1].clone());
188 }
189 RepartitionExpr::Merge(merge) => {
190 let partition_def = self.partition.as_mut().expect("expected partition def");
191 let removed_idx = partition_def
192 .exprs
193 .iter()
194 .position(|expr| expr == &merge.targets[0])
195 .unwrap();
196 let mut partitions = SimplePartitions::from_exprs(
197 partition_def.columns[0].clone(),
198 &partition_def.exprs,
199 )?;
200 partitions.remove_bound(removed_idx)?;
201 partition_def.exprs = partitions.generate()?;
202 }
203 }
204
205 Ok(self)
206 }
207
208 pub fn generate_unique_column_name<R: Rng>(
209 &self,
210 rng: &mut R,
211 generator: &dyn Random<Ident, R>,
212 ) -> Ident {
213 let mut name = generator.generate(rng);
214 while self.columns.iter().any(|col| col.name.value == name.value) {
215 name = generator.generate(rng);
216 }
217 name
218 }
219
220 pub fn generate_unique_table_name<R: Rng>(
221 &self,
222 rng: &mut R,
223 generator: &dyn Random<Ident, R>,
224 ) -> Ident {
225 let mut name = generator.generate(rng);
226 while self.name.value == name.value {
227 name = generator.generate(rng);
228 }
229 name
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use common_query::AddColumnLocation;
236 use common_time::Duration;
237 use datatypes::data_type::ConcreteDataType;
238 use datatypes::value::Value;
239 use rand::SeedableRng;
240
241 use super::TableContext;
242 use crate::generator::Generator;
243 use crate::generator::create_expr::CreateTableExprGeneratorBuilder;
244 use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption, Ttl};
245 use crate::ir::create_expr::ColumnOption;
246 use crate::ir::partition_expr::SimplePartitions;
247 use crate::ir::repartition_expr::{MergePartitionExpr, RepartitionExpr, SplitPartitionExpr};
248 use crate::ir::{AlterTableExpr, Column, Ident};
249
250 #[test]
251 fn test_table_context_alter() {
252 let table_ctx = TableContext {
253 name: "foo".into(),
254 columns: vec![],
255 partition: None,
256 primary_keys: vec![],
257 table_options: vec![],
258 };
259 let expr = AlterTableExpr {
261 table_name: "foo".into(),
262 alter_kinds: AlterTableOperation::AddColumn {
263 column: Column {
264 name: "a".into(),
265 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
266 options: vec![ColumnOption::PrimaryKey],
267 },
268 location: None,
269 },
270 };
271 let table_ctx = table_ctx.alter(expr).unwrap();
272 assert_eq!(table_ctx.columns[0].name, Ident::new("a"));
273 assert_eq!(table_ctx.primary_keys, vec![0]);
274
275 let expr = AlterTableExpr {
277 table_name: "foo".into(),
278 alter_kinds: AlterTableOperation::AddColumn {
279 column: Column {
280 name: "b".into(),
281 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
282 options: vec![ColumnOption::PrimaryKey],
283 },
284 location: Some(AddColumnLocation::First),
285 },
286 };
287 let table_ctx = table_ctx.alter(expr).unwrap();
288 assert_eq!(table_ctx.columns[0].name, Ident::new("b"));
289 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
290
291 let expr = AlterTableExpr {
293 table_name: "foo".into(),
294 alter_kinds: AlterTableOperation::AddColumn {
295 column: Column {
296 name: "c".into(),
297 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
298 options: vec![ColumnOption::PrimaryKey],
299 },
300 location: Some(AddColumnLocation::After {
301 column_name: "b".into(),
302 }),
303 },
304 };
305 let table_ctx = table_ctx.alter(expr).unwrap();
306 assert_eq!(table_ctx.columns[1].name, Ident::new("c"));
307 assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]);
308
309 let expr = AlterTableExpr {
311 table_name: "foo".into(),
312 alter_kinds: AlterTableOperation::DropColumn { name: "b".into() },
313 };
314 let table_ctx = table_ctx.alter(expr).unwrap();
315 assert_eq!(table_ctx.columns[1].name, Ident::new("a"));
316 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
317
318 let ttl_option = AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60)));
320 let expr = AlterTableExpr {
321 table_name: "foo".into(),
322 alter_kinds: AlterTableOperation::SetTableOptions {
323 options: vec![ttl_option.clone()],
324 },
325 };
326 let table_ctx = table_ctx.alter(expr).unwrap();
327 assert_eq!(table_ctx.table_options.len(), 1);
328 assert_eq!(table_ctx.table_options[0], ttl_option);
329
330 let expr = AlterTableExpr {
332 table_name: "foo".into(),
333 alter_kinds: AlterTableOperation::UnsetTableOptions {
334 keys: vec![ttl_option.key().to_string()],
335 },
336 };
337 let table_ctx = table_ctx.alter(expr).unwrap();
338 assert_eq!(table_ctx.table_options.len(), 0);
339 }
340
341 #[test]
342 fn test_apply_split_partition_expr() {
343 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
344 let expr = CreateTableExprGeneratorBuilder::default()
345 .columns(10)
346 .partition(10)
347 .if_not_exists(true)
348 .engine("mito2")
349 .build()
350 .unwrap()
351 .generate(&mut rng)
352 .unwrap();
353 let mut table_ctx = TableContext::from(&expr);
354 let partitions = SimplePartitions::new(
358 table_ctx.partition.as_ref().unwrap().columns[0].clone(),
359 vec![Value::from(10), Value::from(20)],
360 )
361 .generate()
362 .unwrap();
363 let expected_exprs = SimplePartitions::new(
368 table_ctx.partition.as_ref().unwrap().columns[0].clone(),
369 vec![Value::from(10), Value::from(20), Value::from(30)],
370 )
371 .generate()
372 .unwrap();
373 table_ctx.partition.as_mut().unwrap().exprs = partitions.clone();
374 let table_ctx = table_ctx
375 .repartition(RepartitionExpr::Split(SplitPartitionExpr {
376 table_name: expr.table_name.clone(),
377 target: partitions.last().unwrap().clone(),
378 into: vec![expected_exprs[2].clone(), expected_exprs[3].clone()],
379 }))
380 .unwrap();
381 let partition_def = table_ctx.partition.as_ref().unwrap();
382 assert_eq!(partition_def.exprs, expected_exprs);
383 }
384
385 #[test]
386 fn test_apply_merge_partition_expr() {
387 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0);
388 let expr = CreateTableExprGeneratorBuilder::default()
389 .columns(10)
390 .partition(10)
391 .if_not_exists(true)
392 .engine("mito2")
393 .build()
394 .unwrap()
395 .generate(&mut rng)
396 .unwrap();
397 let mut table_ctx = TableContext::from(&expr);
398 let partitions = SimplePartitions::new(
402 table_ctx.partition.as_ref().unwrap().columns[0].clone(),
403 vec![Value::from(10), Value::from(20)],
404 )
405 .generate()
406 .unwrap();
407 let expected_exprs = SimplePartitions::new(
410 table_ctx.partition.as_ref().unwrap().columns[0].clone(),
411 vec![Value::from(10)],
412 )
413 .generate()
414 .unwrap();
415 table_ctx.partition.as_mut().unwrap().exprs = partitions.clone();
416 let table_ctx = table_ctx
417 .repartition(RepartitionExpr::Merge(MergePartitionExpr {
418 table_name: expr.table_name.clone(),
419 targets: vec![partitions[1].clone(), partitions[2].clone()],
420 }))
421 .unwrap();
422 let partition_def = table_ctx.partition.as_ref().unwrap();
423 assert_eq!(partition_def.exprs, expected_exprs);
424 }
425}