1use api::helper::ColumnDataTypeWrapper;
16use api::v1::add_column_location::LocationType;
17use api::v1::alter_table_expr::Kind;
18use api::v1::column_def::{
19 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
20};
21use api::v1::{
22 column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
23 DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
24 SemanticType, SkippingIndexType as PbSkippingIndexType,
25};
26use common_query::AddColumnLocation;
27use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
28use snafu::{ensure, OptionExt, ResultExt};
29use store_api::region_request::{SetRegionOption, UnsetRegionOption};
30use table::metadata::TableId;
31use table::requests::{
32 AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest, SetIndexOptions,
33 UnsetIndexOptions,
34};
35
36use crate::error::{
37 InvalidColumnDefSnafu, InvalidSetFulltextOptionRequestSnafu,
38 InvalidSetSkippingIndexOptionRequestSnafu, InvalidSetTableOptionRequestSnafu,
39 InvalidUnsetTableOptionRequestSnafu, MissingAlterIndexOptionSnafu, MissingFieldSnafu,
40 MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
41};
42
43const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
44const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
45
46pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
48 let catalog_name = expr.catalog_name;
49 let schema_name = expr.schema_name;
50 let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
51 let alter_kind = match kind {
52 Kind::AddColumns(add_columns) => {
53 let add_column_requests = add_columns
54 .add_columns
55 .into_iter()
56 .map(|ac| {
57 let column_def = ac.column_def.context(MissingFieldSnafu {
58 field: "column_def",
59 })?;
60
61 let schema = column_def::try_as_column_schema(&column_def).context(
62 InvalidColumnDefSnafu {
63 column: &column_def.name,
64 },
65 )?;
66 Ok(AddColumnRequest {
67 column_schema: schema,
68 is_key: column_def.semantic_type == SemanticType::Tag as i32,
69 location: parse_location(ac.location)?,
70 add_if_not_exists: ac.add_if_not_exists,
71 })
72 })
73 .collect::<Result<Vec<_>>>()?;
74
75 AlterKind::AddColumns {
76 columns: add_column_requests,
77 }
78 }
79 Kind::ModifyColumnTypes(ModifyColumnTypes {
80 modify_column_types,
81 }) => {
82 let modify_column_type_requests = modify_column_types
83 .into_iter()
84 .map(|cct| {
85 let target_type =
86 ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
87 .into();
88
89 Ok(ModifyColumnTypeRequest {
90 column_name: cct.column_name,
91 target_type,
92 })
93 })
94 .collect::<Result<Vec<_>>>()?;
95
96 AlterKind::ModifyColumnTypes {
97 columns: modify_column_type_requests,
98 }
99 }
100 Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
101 names: drop_columns.into_iter().map(|c| c.name).collect(),
102 },
103 Kind::RenameTable(RenameTable { new_table_name }) => {
104 AlterKind::RenameTable { new_table_name }
105 }
106 Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
107 AlterKind::SetTableOptions {
108 options: table_options
109 .iter()
110 .map(SetRegionOption::try_from)
111 .collect::<std::result::Result<Vec<_>, _>>()
112 .context(InvalidSetTableOptionRequestSnafu)?,
113 }
114 }
115 Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
116 AlterKind::UnsetTableOptions {
117 keys: keys
118 .iter()
119 .map(|key| UnsetRegionOption::try_from(key.as_str()))
120 .collect::<std::result::Result<Vec<_>, _>>()
121 .context(InvalidUnsetTableOptionRequestSnafu)?,
122 }
123 }
124 Kind::SetIndex(o) => match o.options {
125 Some(opt) => match opt {
126 api::v1::set_index::Options::Fulltext(f) => AlterKind::SetIndex {
127 options: SetIndexOptions::Fulltext {
128 column_name: f.column_name.clone(),
129 options: FulltextOptions {
130 enable: f.enable,
131 analyzer: as_fulltext_option_analyzer(
132 Analyzer::try_from(f.analyzer)
133 .context(InvalidSetFulltextOptionRequestSnafu)?,
134 ),
135 case_sensitive: f.case_sensitive,
136 backend: as_fulltext_option_backend(
137 PbFulltextBackend::try_from(f.backend)
138 .context(InvalidSetFulltextOptionRequestSnafu)?,
139 ),
140 },
141 },
142 },
143 api::v1::set_index::Options::Inverted(i) => AlterKind::SetIndex {
144 options: SetIndexOptions::Inverted {
145 column_name: i.column_name,
146 },
147 },
148 api::v1::set_index::Options::Skipping(s) => AlterKind::SetIndex {
149 options: SetIndexOptions::Skipping {
150 column_name: s.column_name,
151 options: SkippingIndexOptions {
152 granularity: s.granularity as u32,
153 index_type: as_skipping_index_type(
154 PbSkippingIndexType::try_from(s.skipping_index_type)
155 .context(InvalidSetSkippingIndexOptionRequestSnafu)?,
156 ),
157 },
158 },
159 },
160 },
161 None => return MissingAlterIndexOptionSnafu.fail(),
162 },
163 Kind::UnsetIndex(o) => match o.options {
164 Some(opt) => match opt {
165 api::v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
166 options: UnsetIndexOptions::Fulltext {
167 column_name: f.column_name,
168 },
169 },
170 api::v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
171 options: UnsetIndexOptions::Inverted {
172 column_name: i.column_name,
173 },
174 },
175 api::v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
176 options: UnsetIndexOptions::Skipping {
177 column_name: s.column_name,
178 },
179 },
180 },
181 None => return MissingAlterIndexOptionSnafu.fail(),
182 },
183 };
184
185 let request = AlterTableRequest {
186 catalog_name,
187 schema_name,
188 table_name: expr.table_name,
189 table_id,
190 alter_kind,
191 table_version: None,
192 };
193 Ok(request)
194}
195
196pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {
197 let column_schemas = expr
198 .column_defs
199 .iter()
200 .map(|x| {
201 column_def::try_as_column_schema(x).context(InvalidColumnDefSnafu { column: &x.name })
202 })
203 .collect::<Result<Vec<ColumnSchema>>>()?;
204
205 if require_time_index {
207 ensure!(
208 column_schemas
209 .iter()
210 .any(|column| column.name == expr.time_index),
211 MissingTimestampColumnSnafu {
212 msg: format!("CreateExpr: {expr:?}")
213 }
214 );
215 }
216
217 let column_schemas = column_schemas
218 .into_iter()
219 .map(|column_schema| {
220 if column_schema.name == expr.time_index {
221 column_schema.with_time_index(true)
222 } else {
223 column_schema
224 }
225 })
226 .collect::<Vec<_>>();
227
228 Ok(RawSchema::new(column_schemas))
229}
230
231fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
232 match location {
233 Some(Location {
234 location_type: LOCATION_TYPE_FIRST,
235 ..
236 }) => Ok(Some(AddColumnLocation::First)),
237 Some(Location {
238 location_type: LOCATION_TYPE_AFTER,
239 after_column_name,
240 }) => Ok(Some(AddColumnLocation::After {
241 column_name: after_column_name,
242 })),
243 Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
244 None => Ok(None),
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use api::v1::{
251 AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
252 SemanticType,
253 };
254 use datatypes::prelude::ConcreteDataType;
255
256 use super::*;
257
258 #[test]
259 fn test_alter_expr_to_request() {
260 let expr = AlterTableExpr {
261 catalog_name: String::default(),
262 schema_name: String::default(),
263 table_name: "monitor".to_string(),
264
265 kind: Some(Kind::AddColumns(AddColumns {
266 add_columns: vec![AddColumn {
267 column_def: Some(ColumnDef {
268 name: "mem_usage".to_string(),
269 data_type: ColumnDataType::Float64 as i32,
270 is_nullable: false,
271 default_constraint: vec![],
272 semantic_type: SemanticType::Field as i32,
273 comment: String::new(),
274 ..Default::default()
275 }),
276 location: None,
277 add_if_not_exists: true,
278 }],
279 })),
280 };
281
282 let alter_request = alter_expr_to_request(1, expr).unwrap();
283 assert_eq!(alter_request.catalog_name, "");
284 assert_eq!(alter_request.schema_name, "");
285 assert_eq!("monitor".to_string(), alter_request.table_name);
286 let add_column = match alter_request.alter_kind {
287 AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
288 _ => unreachable!(),
289 };
290
291 assert!(!add_column.is_key);
292 assert_eq!("mem_usage", add_column.column_schema.name);
293 assert_eq!(
294 ConcreteDataType::float64_datatype(),
295 add_column.column_schema.data_type
296 );
297 assert_eq!(None, add_column.location);
298 assert!(add_column.add_if_not_exists);
299 }
300
301 #[test]
302 fn test_alter_expr_with_location_to_request() {
303 let expr = AlterTableExpr {
304 catalog_name: String::default(),
305 schema_name: String::default(),
306 table_name: "monitor".to_string(),
307
308 kind: Some(Kind::AddColumns(AddColumns {
309 add_columns: vec![
310 AddColumn {
311 column_def: Some(ColumnDef {
312 name: "mem_usage".to_string(),
313 data_type: ColumnDataType::Float64 as i32,
314 is_nullable: false,
315 default_constraint: vec![],
316 semantic_type: SemanticType::Field as i32,
317 comment: String::new(),
318 ..Default::default()
319 }),
320 location: Some(Location {
321 location_type: LocationType::First.into(),
322 after_column_name: String::default(),
323 }),
324 add_if_not_exists: false,
325 },
326 AddColumn {
327 column_def: Some(ColumnDef {
328 name: "cpu_usage".to_string(),
329 data_type: ColumnDataType::Float64 as i32,
330 is_nullable: false,
331 default_constraint: vec![],
332 semantic_type: SemanticType::Field as i32,
333 comment: String::new(),
334 ..Default::default()
335 }),
336 location: Some(Location {
337 location_type: LocationType::After.into(),
338 after_column_name: "ts".to_string(),
339 }),
340 add_if_not_exists: true,
341 },
342 ],
343 })),
344 };
345
346 let alter_request = alter_expr_to_request(1, expr).unwrap();
347 assert_eq!(alter_request.catalog_name, "");
348 assert_eq!(alter_request.schema_name, "");
349 assert_eq!("monitor".to_string(), alter_request.table_name);
350
351 let mut add_columns = match alter_request.alter_kind {
352 AlterKind::AddColumns { columns } => columns,
353 _ => unreachable!(),
354 };
355
356 let add_column = add_columns.pop().unwrap();
357 assert!(!add_column.is_key);
358 assert_eq!("cpu_usage", add_column.column_schema.name);
359 assert_eq!(
360 ConcreteDataType::float64_datatype(),
361 add_column.column_schema.data_type
362 );
363 assert_eq!(
364 Some(AddColumnLocation::After {
365 column_name: "ts".to_string()
366 }),
367 add_column.location
368 );
369 assert!(add_column.add_if_not_exists);
370
371 let add_column = add_columns.pop().unwrap();
372 assert!(!add_column.is_key);
373 assert_eq!("mem_usage", add_column.column_schema.name);
374 assert_eq!(
375 ConcreteDataType::float64_datatype(),
376 add_column.column_schema.data_type
377 );
378 assert_eq!(Some(AddColumnLocation::First), add_column.location);
379 assert!(!add_column.add_if_not_exists);
380 }
381
382 #[test]
383 fn test_modify_column_type_expr() {
384 let expr = AlterTableExpr {
385 catalog_name: "test_catalog".to_string(),
386 schema_name: "test_schema".to_string(),
387 table_name: "monitor".to_string(),
388
389 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
390 modify_column_types: vec![ModifyColumnType {
391 column_name: "mem_usage".to_string(),
392 target_type: ColumnDataType::String as i32,
393 target_type_extension: None,
394 }],
395 })),
396 };
397
398 let alter_request = alter_expr_to_request(1, expr).unwrap();
399 assert_eq!(alter_request.catalog_name, "test_catalog");
400 assert_eq!(alter_request.schema_name, "test_schema");
401 assert_eq!("monitor".to_string(), alter_request.table_name);
402
403 let mut modify_column_types = match alter_request.alter_kind {
404 AlterKind::ModifyColumnTypes { columns } => columns,
405 _ => unreachable!(),
406 };
407
408 let modify_column_type = modify_column_types.pop().unwrap();
409 assert_eq!("mem_usage", modify_column_type.column_name);
410 assert_eq!(
411 ConcreteDataType::string_datatype(),
412 modify_column_type.target_type
413 );
414 }
415
416 #[test]
417 fn test_drop_column_expr() {
418 let expr = AlterTableExpr {
419 catalog_name: "test_catalog".to_string(),
420 schema_name: "test_schema".to_string(),
421 table_name: "monitor".to_string(),
422
423 kind: Some(Kind::DropColumns(DropColumns {
424 drop_columns: vec![DropColumn {
425 name: "mem_usage".to_string(),
426 }],
427 })),
428 };
429
430 let alter_request = alter_expr_to_request(1, expr).unwrap();
431 assert_eq!(alter_request.catalog_name, "test_catalog");
432 assert_eq!(alter_request.schema_name, "test_schema");
433 assert_eq!("monitor".to_string(), alter_request.table_name);
434
435 let mut drop_names = match alter_request.alter_kind {
436 AlterKind::DropColumns { names } => names,
437 _ => unreachable!(),
438 };
439 assert_eq!(1, drop_names.len());
440 assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
441 }
442}