1use std::sync::Arc;
16
17use common_query::AddColumnLocation;
18use datatypes::types::cast;
19use partition::partition::PartitionDef;
20use rand::Rng;
21use snafu::{ensure, OptionExt};
22
23use crate::error::{self, Result};
24use crate::generator::Random;
25use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption};
26use crate::ir::create_expr::ColumnOption;
27use crate::ir::{AlterTableExpr, Column, CreateTableExpr, Ident};
28
29pub type TableContextRef = Arc<TableContext>;
30
31#[derive(Debug, Clone)]
33pub struct TableContext {
34 pub name: Ident,
35 pub columns: Vec<Column>,
36
37 pub partition: Option<PartitionDef>,
39 pub primary_keys: Vec<usize>,
40 pub table_options: Vec<AlterTableOption>,
41}
42
43impl From<&CreateTableExpr> for TableContext {
44 fn from(
45 CreateTableExpr {
46 table_name: name,
47 columns,
48 partition,
49 primary_keys,
50 ..
51 }: &CreateTableExpr,
52 ) -> Self {
53 Self {
54 name: name.clone(),
55 columns: columns.clone(),
56 partition: partition.clone(),
57 primary_keys: primary_keys.clone(),
58 table_options: vec![],
59 }
60 }
61}
62
63impl TableContext {
64 pub fn timestamp_column(&self) -> Option<Column> {
66 self.columns.iter().find(|c| c.is_time_index()).cloned()
67 }
68
69 pub fn alter(mut self, expr: AlterTableExpr) -> Result<TableContext> {
71 match expr.alter_kinds {
72 AlterTableOperation::AddColumn { column, location } => {
73 ensure!(
74 !self.columns.iter().any(|col| col.name == column.name),
75 error::UnexpectedSnafu {
76 violated: format!("Column {} exists", column.name),
77 }
78 );
79 match location {
80 Some(AddColumnLocation::First) => {
81 let mut columns = Vec::with_capacity(self.columns.len() + 1);
82 columns.push(column);
83 columns.extend(self.columns);
84 self.columns = columns;
85 }
86 Some(AddColumnLocation::After { column_name }) => {
87 let index = self
88 .columns
89 .iter()
90 .position(|col| col.name.to_string() == column_name)
92 .context(error::UnexpectedSnafu {
93 violated: format!("Column: {column_name} not found"),
94 })?;
95 self.columns.insert(index + 1, column);
96 }
97 None => self.columns.push(column),
98 }
99 self.primary_keys = self
101 .columns
102 .iter()
103 .enumerate()
104 .flat_map(|(idx, col)| {
105 if col.is_primary_key() {
106 Some(idx)
107 } else {
108 None
109 }
110 })
111 .collect();
112 Ok(self)
113 }
114 AlterTableOperation::DropColumn { name } => {
115 self.columns.retain(|col| col.name != name);
116 self.primary_keys = self
118 .columns
119 .iter()
120 .enumerate()
121 .flat_map(|(idx, col)| {
122 if col.is_primary_key() {
123 Some(idx)
124 } else {
125 None
126 }
127 })
128 .collect();
129 Ok(self)
130 }
131 AlterTableOperation::RenameTable { new_table_name } => {
132 ensure!(
133 new_table_name != self.name,
134 error::UnexpectedSnafu {
135 violated: "The new table name is equal the current name",
136 }
137 );
138 self.name = new_table_name;
139 Ok(self)
140 }
141 AlterTableOperation::ModifyDataType { column } => {
142 if let Some(idx) = self.columns.iter().position(|col| col.name == column.name) {
143 self.columns[idx].column_type = column.column_type.clone();
144 for opt in self.columns[idx].options.iter_mut() {
145 if let ColumnOption::DefaultValue(value) = opt {
146 *value = cast(value.clone(), &column.column_type).unwrap();
147 }
148 }
149 }
150 Ok(self)
151 }
152 AlterTableOperation::SetTableOptions { options } => {
153 for option in options {
154 if let Some(idx) = self
155 .table_options
156 .iter()
157 .position(|opt| opt.key() == option.key())
158 {
159 self.table_options[idx] = option;
160 } else {
161 self.table_options.push(option);
162 }
163 }
164 Ok(self)
165 }
166 AlterTableOperation::UnsetTableOptions { keys } => {
167 self.table_options
168 .retain(|opt| !keys.contains(&opt.key().to_string()));
169 Ok(self)
170 }
171 }
172 }
173
174 pub fn generate_unique_column_name<R: Rng>(
175 &self,
176 rng: &mut R,
177 generator: &dyn Random<Ident, R>,
178 ) -> Ident {
179 let mut name = generator.gen(rng);
180 while self.columns.iter().any(|col| col.name.value == name.value) {
181 name = generator.gen(rng);
182 }
183 name
184 }
185
186 pub fn generate_unique_table_name<R: Rng>(
187 &self,
188 rng: &mut R,
189 generator: &dyn Random<Ident, R>,
190 ) -> Ident {
191 let mut name = generator.gen(rng);
192 while self.name.value == name.value {
193 name = generator.gen(rng);
194 }
195 name
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use common_query::AddColumnLocation;
202 use common_time::Duration;
203 use datatypes::data_type::ConcreteDataType;
204
205 use super::TableContext;
206 use crate::ir::alter_expr::{AlterTableOperation, AlterTableOption, Ttl};
207 use crate::ir::create_expr::ColumnOption;
208 use crate::ir::{AlterTableExpr, Column, Ident};
209
210 #[test]
211 fn test_table_context_alter() {
212 let table_ctx = TableContext {
213 name: "foo".into(),
214 columns: vec![],
215 partition: None,
216 primary_keys: vec![],
217 table_options: vec![],
218 };
219 let expr = AlterTableExpr {
221 table_name: "foo".into(),
222 alter_kinds: AlterTableOperation::AddColumn {
223 column: Column {
224 name: "a".into(),
225 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
226 options: vec![ColumnOption::PrimaryKey],
227 },
228 location: None,
229 },
230 };
231 let table_ctx = table_ctx.alter(expr).unwrap();
232 assert_eq!(table_ctx.columns[0].name, Ident::new("a"));
233 assert_eq!(table_ctx.primary_keys, vec![0]);
234
235 let expr = AlterTableExpr {
237 table_name: "foo".into(),
238 alter_kinds: AlterTableOperation::AddColumn {
239 column: Column {
240 name: "b".into(),
241 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
242 options: vec![ColumnOption::PrimaryKey],
243 },
244 location: Some(AddColumnLocation::First),
245 },
246 };
247 let table_ctx = table_ctx.alter(expr).unwrap();
248 assert_eq!(table_ctx.columns[0].name, Ident::new("b"));
249 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
250
251 let expr = AlterTableExpr {
253 table_name: "foo".into(),
254 alter_kinds: AlterTableOperation::AddColumn {
255 column: Column {
256 name: "c".into(),
257 column_type: ConcreteDataType::timestamp_microsecond_datatype(),
258 options: vec![ColumnOption::PrimaryKey],
259 },
260 location: Some(AddColumnLocation::After {
261 column_name: "b".into(),
262 }),
263 },
264 };
265 let table_ctx = table_ctx.alter(expr).unwrap();
266 assert_eq!(table_ctx.columns[1].name, Ident::new("c"));
267 assert_eq!(table_ctx.primary_keys, vec![0, 1, 2]);
268
269 let expr = AlterTableExpr {
271 table_name: "foo".into(),
272 alter_kinds: AlterTableOperation::DropColumn { name: "b".into() },
273 };
274 let table_ctx = table_ctx.alter(expr).unwrap();
275 assert_eq!(table_ctx.columns[1].name, Ident::new("a"));
276 assert_eq!(table_ctx.primary_keys, vec![0, 1]);
277
278 let ttl_option = AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60)));
280 let expr = AlterTableExpr {
281 table_name: "foo".into(),
282 alter_kinds: AlterTableOperation::SetTableOptions {
283 options: vec![ttl_option.clone()],
284 },
285 };
286 let table_ctx = table_ctx.alter(expr).unwrap();
287 assert_eq!(table_ctx.table_options.len(), 1);
288 assert_eq!(table_ctx.table_options[0], ttl_option);
289
290 let expr = AlterTableExpr {
292 table_name: "foo".into(),
293 alter_kinds: AlterTableOperation::UnsetTableOptions {
294 keys: vec![ttl_option.key().to_string()],
295 },
296 };
297 let table_ctx = table_ctx.alter(expr).unwrap();
298 assert_eq!(table_ctx.table_options.len(), 0);
299 }
300}