1use std::sync::Arc;
16
17use common_query::AddColumnLocation;
18use datatypes::types::cast;
19use rand::Rng;
20use snafu::{OptionExt, ensure};
21
22use crate::error::{self, Result};
23use crate::generator::Random;
24use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption};
25use crate::ir::create_expr::{ColumnOption, PartitionDef};
26use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
27
28pub type TableContextRef = Arc<TableContext>;
29
30#[derive(Debug, Clone)]
32pub struct TableContext {
33 pub name: Ident,
34 pub columns: Vec<Column>,
35
36 pub partition: Option<PartitionDef>,
38 pub primary_keys: Vec<usize>,
39 pub table_options: Vec<AlterTableOption>,
40}
41
42impl From<&CreateTableExpr> for TableContext {
43 fn from(
44 CreateTableExpr {
45 table_name: name,
46 columns,
47 partition,
48 primary_keys,
49 ..
50 }: &CreateTableExpr,
51 ) -> Self {
52 Self {
53 name: name.clone(),
54 columns: columns.clone(),
55 partition: partition.clone(),
56 primary_keys: primary_keys.clone(),
57 table_options: vec![],
58 }
59 }
60}
61
62impl TableContext {
63 pub fn timestamp_column(&self) -> Option<Column> {
65 self.columns.iter().find(|c| c.is_time_index()).cloned()
66 }
67
68 pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
70 match expr.alter_kinds {
71 AlterTableOperation::AddColumn { column, location } => {
72 ensure!(
73 !self.columns.iter().any(|col| col.name == column.name),
74 error::UnexpectedSnafu {
75 violated: format!("Column {} exists", column.name),
76 }
77 );
78 match location {
79 Some(AddColumnLocation::First) => {
80 let mut columns = Vec::with_capacity(self.columns.len() + 1);
81 columns.push(column);
82 columns.extend(self.columns);
83 self.columns = columns;
84 }
85 Some(AddColumnLocation::After { column_name }) => {
86 let index = self
87 .columns
88 .iter()
89 .position(|col| col.name.to_string() == column_name)
91 .context(error::UnexpectedSnafu {
92 violated: format!("Column: {column_name} not found"),
93 })?;
94 self.columns.insert(index + 1, column);
95 }
96 None => self.columns.push(column),
97 }
98 self.primary_keys = self
100 .columns
101 .iter()
102 .enumerate()
103 .flat_map(|(idx, col)| {
104 if col.is_primary_key() {
105 Some(idx)
106 } else {
107 None
108 }
109 })
110 .collect();
111 Ok(self)
112 }
113 AlterTableOperation::DropColumn { name } => {
114 self.columns.retain(|col| col.name != name);
115 self.primary_keys = self
117 .columns
118 .iter()
119 .enumerate()
120 .flat_map(|(idx, col)| {
121 if col.is_primary_key() {
122 Some(idx)
123 } else {
124 None
125 }
126 })
127 .collect();
128 Ok(self)
129 }
130 AlterTableOperation::RenameTable { new_table_name } => {
131 ensure!(
132 new_table_name != self.name,
133 error::UnexpectedSnafu {
134 violated: "The new table name is equal the current name",
135 }
136 );
137 self.name = new_table_name;
138 Ok(self)
139 }
140 AlterTableOperation::ModifyDataType { column } => {
141 if let Some(idx) = self.columns.iter().position(|col| col.name == column.name) {
142 self.columns[idx].column_type = column.column_type.clone();
143 for opt in self.columns[idx].options.iter_mut() {
144 if let ColumnOption::DefaultValue(value) = opt {
145 *value = cast(value.clone(), &column.column_type).unwrap();
146 }
147 }
148 }
149 Ok(self)
150 }
151 AlterTableOperation::SetTableOptions { options } => {
152 for option in options {
153 if let Some(idx) = self
154 .table_options
155 .iter()
156 .position(|opt| opt.key() == option.key())
157 {
158 self.table_options[idx] = option;
159 } else {
160 self.table_options.push(option);
161 }
162 }
163 Ok(self)
164 }
165 AlterTableOperation::UnsetTableOptions { keys } => {
166 self.table_options
167 .retain(|opt| !keys.contains(&opt.key().to_string()));
168 Ok(self)
169 }
170 }
171 }
172
173 pub fn generate_unique_column_name<R: Rng>(
174 &self,
175 rng: &mut R,
176 generator: &dyn Random<Ident, R>,
177 ) -> Ident {
178 let mut name = generator.generate(rng);
179 while self.columns.iter().any(|col| col.name.value == name.value) {
180 name = generator.generate(rng);
181 }
182 name
183 }
184
185 pub fn generate_unique_table_name<R: Rng>(
186 &self,
187 rng: &mut R,
188 generator: &dyn Random<Ident, R>,
189 ) -> Ident {
190 let mut name = generator.generate(rng);
191 while self.name.value == name.value {
192 name = generator.generate(rng);
193 }
194 name
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use common_query::AddColumnLocation;
201 use common_time::Duration;
202 use datatypes::data_type::ConcreteDataType;
203
204 use super::TableContext;
205 use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption, Ttl};
206 use crate::ir::create_expr::ColumnOption;
207 use crate::ir::{AlterTableExpr, Column, Ident};
208
209 #[test]
210 fn test_table_context_alter() {
211 let table_ctx = TableContext {
212 name: "foo".into(),
213 columns: vec![],
214 partition: None,
215 primary_keys: vec![],
216 table_options: vec![],
217 };
218 let expr = AlterTableExpr {
220 table_name: "foo".into(),
221 alter_kinds: AlterTableOperation::AddColumn {
222 column: Column {
223 name: "a".into(),
224 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
225 options: vec![ColumnOption::PrimaryKey],
226 },
227 location: None,
228 },
229 };
230 let table_ctx = table_ctx.alter(expr).unwrap();
231 assert_eq!(table_ctx.columns[0].name, Ident::new("a"));
232 assert_eq!(table_ctx.primary_keys, vec![0]);
233
234 let expr = AlterTableExpr {
236 table_name: "foo".into(),
237 alter_kinds: AlterTableOperation::AddColumn {
238 column: Column {
239 name: "b".into(),
240 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
241 options: vec![ColumnOption::PrimaryKey],
242 },
243 location: Some(AddColumnLocation::First),
244 },
245 };
246 let table_ctx = table_ctx.alter(expr).unwrap();
247 assert_eq!(table_ctx.columns[0].name, Ident::new("b"));
248 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
249
250 let expr = AlterTableExpr {
252 table_name: "foo".into(),
253 alter_kinds: AlterTableOperation::AddColumn {
254 column: Column {
255 name: "c".into(),
256 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
257 options: vec![ColumnOption::PrimaryKey],
258 },
259 location: Some(AddColumnLocation::After {
260 column_name: "b".into(),
261 }),
262 },
263 };
264 let table_ctx = table_ctx.alter(expr).unwrap();
265 assert_eq!(table_ctx.columns[1].name, Ident::new("c"));
266 assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]);
267
268 let expr = AlterTableExpr {
270 table_name: "foo".into(),
271 alter_kinds: AlterTableOperation::DropColumn { name: "b".into() },
272 };
273 let table_ctx = table_ctx.alter(expr).unwrap();
274 assert_eq!(table_ctx.columns[1].name, Ident::new("a"));
275 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
276
277 let ttl_option = AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60)));
279 let expr = AlterTableExpr {
280 table_name: "foo".into(),
281 alter_kinds: AlterTableOperation::SetTableOptions {
282 options: vec![ttl_option.clone()],
283 },
284 };
285 let table_ctx = table_ctx.alter(expr).unwrap();
286 assert_eq!(table_ctx.table_options.len(), 1);
287 assert_eq!(table_ctx.table_options[0], ttl_option);
288
289 let expr = AlterTableExpr {
291 table_name: "foo".into(),
292 alter_kinds: AlterTableOperation::UnsetTableOptions {
293 keys: vec![ttl_option.key().to_string()],
294 },
295 };
296 let table_ctx = table_ctx.alter(expr).unwrap();
297 assert_eq!(table_ctx.table_options.len(), 0);
298 }
299}