1use api::helper::ColumnDataTypeWrapper;
16use api::v1::add_column_location::LocationType;
17use api::v1::alter_table_expr::Kind;
18use api::v1::column_def::{
19 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
20};
21use api::v1::{
22 AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr, DropColumns,
23 FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable, SemanticType,
24 SkippingIndexType as PbSkippingIndexType, column_def,
25};
26use common_query::AddColumnLocation;
27use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SkippingIndexOptions};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_request::{SetRegionOption, UnsetRegionOption};
30use table::metadata::{TableId, TableMeta};
31use table::requests::{
32 AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest, SetDefaultRequest,
33 SetIndexOption, UnsetIndexOption,
34};
35
36use crate::error::{
37 self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
38 InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
39 InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
40 MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
41 MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
42};
43
44const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
45const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
46
47fn set_index_option_from_proto(set_index: api::v1::SetIndex) -> Result<SetIndexOption> {
48 let options = set_index.options.context(MissingAlterIndexOptionSnafu)?;
49 Ok(match options {
50 api::v1::set_index::Options::Fulltext(f) => SetIndexOption::Fulltext {
51 column_name: f.column_name.clone(),
52 options: FulltextOptions::new(
53 f.enable,
54 as_fulltext_option_analyzer(
55 Analyzer::try_from(f.analyzer).context(InvalidSetFulltextOptionRequestSnafu)?,
56 ),
57 f.case_sensitive,
58 as_fulltext_option_backend(
59 PbFulltextBackend::try_from(f.backend)
60 .context(InvalidSetFulltextOptionRequestSnafu)?,
61 ),
62 f.granularity as u32,
63 f.false_positive_rate,
64 )
65 .context(InvalidIndexOptionSnafu)?,
66 },
67 api::v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
68 column_name: i.column_name,
69 },
70 api::v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
71 column_name: s.column_name,
72 options: SkippingIndexOptions::new(
73 s.granularity as u32,
74 s.false_positive_rate,
75 as_skipping_index_type(
76 PbSkippingIndexType::try_from(s.skipping_index_type)
77 .context(InvalidSetSkippingIndexOptionRequestSnafu)?,
78 ),
79 )
80 .context(InvalidIndexOptionSnafu)?,
81 },
82 })
83}
84
85fn unset_index_option_from_proto(unset_index: api::v1::UnsetIndex) -> Result<UnsetIndexOption> {
86 let options = unset_index.options.context(MissingAlterIndexOptionSnafu)?;
87 Ok(match options {
88 api::v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
89 column_name: f.column_name,
90 },
91 api::v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
92 column_name: i.column_name,
93 },
94 api::v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
95 column_name: s.column_name,
96 },
97 })
98}
99
100pub fn alter_expr_to_request(
104 table_id: TableId,
105 expr: AlterTableExpr,
106 table_meta: Option<&TableMeta>,
107) -> Result<AlterTableRequest> {
108 let catalog_name = expr.catalog_name;
109 let schema_name = expr.schema_name;
110 let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
111 let alter_kind = match kind {
112 Kind::AddColumns(add_columns) => {
113 let add_column_requests = add_columns
114 .add_columns
115 .into_iter()
116 .map(|ac| {
117 let column_def = ac.column_def.context(MissingFieldSnafu {
118 field: "column_def",
119 })?;
120
121 let schema = column_def::try_as_column_schema(&column_def).context(
122 InvalidColumnDefSnafu {
123 column: &column_def.name,
124 },
125 )?;
126 Ok(AddColumnRequest {
127 column_schema: schema,
128 is_key: column_def.semantic_type == SemanticType::Tag as i32,
129 location: parse_location(ac.location)?,
130 add_if_not_exists: ac.add_if_not_exists,
131 })
132 })
133 .collect::<Result<Vec<_>>>()?;
134
135 AlterKind::AddColumns {
136 columns: add_column_requests,
137 }
138 }
139 Kind::ModifyColumnTypes(ModifyColumnTypes {
140 modify_column_types,
141 }) => {
142 let modify_column_type_requests = modify_column_types
143 .into_iter()
144 .map(|cct| {
145 let target_type =
146 ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
147 .into();
148
149 Ok(ModifyColumnTypeRequest {
150 column_name: cct.column_name,
151 target_type,
152 })
153 })
154 .collect::<Result<Vec<_>>>()?;
155
156 AlterKind::ModifyColumnTypes {
157 columns: modify_column_type_requests,
158 }
159 }
160 Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
161 names: drop_columns.into_iter().map(|c| c.name).collect(),
162 },
163 Kind::RenameTable(RenameTable { new_table_name }) => {
164 AlterKind::RenameTable { new_table_name }
165 }
166 Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
167 AlterKind::SetTableOptions {
168 options: table_options
169 .iter()
170 .map(SetRegionOption::try_from)
171 .collect::<std::result::Result<Vec<_>, _>>()
172 .context(InvalidSetTableOptionRequestSnafu)?,
173 }
174 }
175 Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
176 AlterKind::UnsetTableOptions {
177 keys: keys
178 .iter()
179 .map(|key| UnsetRegionOption::try_from(key.as_str()))
180 .collect::<std::result::Result<Vec<_>, _>>()
181 .context(InvalidUnsetTableOptionRequestSnafu)?,
182 }
183 }
184 Kind::SetIndex(o) => {
185 let option = set_index_option_from_proto(o)?;
186 AlterKind::SetIndexes {
187 options: vec![option],
188 }
189 }
190 Kind::UnsetIndex(o) => {
191 let option = unset_index_option_from_proto(o)?;
192 AlterKind::UnsetIndexes {
193 options: vec![option],
194 }
195 }
196 Kind::SetIndexes(o) => {
197 let options = o
198 .set_indexes
199 .into_iter()
200 .map(set_index_option_from_proto)
201 .collect::<Result<Vec<_>>>()?;
202 AlterKind::SetIndexes { options }
203 }
204 Kind::UnsetIndexes(o) => {
205 let options = o
206 .unset_indexes
207 .into_iter()
208 .map(unset_index_option_from_proto)
209 .collect::<Result<Vec<_>>>()?;
210 AlterKind::UnsetIndexes { options }
211 }
212 Kind::DropDefaults(o) => {
213 let names = o
214 .drop_defaults
215 .into_iter()
216 .map(|col| {
217 ensure!(
218 !col.column_name.is_empty(),
219 MissingFieldSnafu {
220 field: "column_name"
221 }
222 );
223 Ok(col.column_name)
224 })
225 .collect::<Result<Vec<_>>>()?;
226 AlterKind::DropDefaults { names }
227 }
228 Kind::SetDefaults(o) => {
229 let table_meta = table_meta.context(MissingTableMetaSnafu { table_id })?;
230 let defaults = o
231 .set_defaults
232 .into_iter()
233 .map(|col| {
234 let column_scheme = table_meta
235 .schema
236 .column_schema_by_name(&col.column_name)
237 .context(ColumnNotFoundSnafu {
238 column_name: &col.column_name,
239 })?;
240 let default_constraint = common_sql::convert::deserialize_default_constraint(
241 col.default_constraint.as_slice(),
242 &col.column_name,
243 &column_scheme.data_type,
244 )
245 .context(crate::error::SqlCommonSnafu)?;
246 Ok(SetDefaultRequest {
247 column_name: col.column_name,
248 default_constraint,
249 })
250 })
251 .collect::<Result<Vec<_>>>()?;
252 AlterKind::SetDefaults { defaults }
253 }
254 Kind::Repartition(_) => error::UnexpectedSnafu {
255 err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
256 }
257 .fail()?,
258 };
259
260 let request = AlterTableRequest {
261 catalog_name,
262 schema_name,
263 table_name: expr.table_name,
264 table_id,
265 alter_kind,
266 table_version: None,
267 };
268 Ok(request)
269}
270
271pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<Schema> {
272 let column_schemas = expr
273 .column_defs
274 .iter()
275 .map(|x| {
276 column_def::try_as_column_schema(x).context(InvalidColumnDefSnafu { column: &x.name })
277 })
278 .collect::<Result<Vec<ColumnSchema>>>()?;
279
280 if require_time_index {
282 ensure!(
283 column_schemas
284 .iter()
285 .any(|column| column.name == expr.time_index),
286 MissingTimestampColumnSnafu {
287 msg: format!("CreateExpr: {expr:?}")
288 }
289 );
290 }
291
292 let column_schemas = column_schemas
293 .into_iter()
294 .map(|column_schema| {
295 if column_schema.name == expr.time_index {
296 column_schema.with_time_index(true)
297 } else {
298 column_schema
299 }
300 })
301 .collect::<Vec<_>>();
302
303 Ok(Schema::new(column_schemas))
304}
305
306fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
307 match location {
308 Some(Location {
309 location_type: LOCATION_TYPE_FIRST,
310 ..
311 }) => Ok(Some(AddColumnLocation::First)),
312 Some(Location {
313 location_type: LOCATION_TYPE_AFTER,
314 after_column_name,
315 }) => Ok(Some(AddColumnLocation::After {
316 column_name: after_column_name,
317 })),
318 Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
319 None => Ok(None),
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use api::v1::{
326 AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
327 SemanticType,
328 };
329 use datatypes::prelude::ConcreteDataType;
330
331 use super::*;
332
333 #[test]
334 fn test_alter_expr_to_request() {
335 let expr = AlterTableExpr {
336 catalog_name: String::default(),
337 schema_name: String::default(),
338 table_name: "monitor".to_string(),
339
340 kind: Some(Kind::AddColumns(AddColumns {
341 add_columns: vec![AddColumn {
342 column_def: Some(ColumnDef {
343 name: "mem_usage".to_string(),
344 data_type: ColumnDataType::Float64 as i32,
345 is_nullable: false,
346 default_constraint: vec![],
347 semantic_type: SemanticType::Field as i32,
348 comment: String::new(),
349 ..Default::default()
350 }),
351 location: None,
352 add_if_not_exists: true,
353 }],
354 })),
355 };
356
357 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
358 assert_eq!(alter_request.catalog_name, "");
359 assert_eq!(alter_request.schema_name, "");
360 assert_eq!("monitor".to_string(), alter_request.table_name);
361 let add_column = match alter_request.alter_kind {
362 AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
363 _ => unreachable!(),
364 };
365
366 assert!(!add_column.is_key);
367 assert_eq!("mem_usage", add_column.column_schema.name);
368 assert_eq!(
369 ConcreteDataType::float64_datatype(),
370 add_column.column_schema.data_type
371 );
372 assert_eq!(None, add_column.location);
373 assert!(add_column.add_if_not_exists);
374 }
375
376 #[test]
377 fn test_alter_expr_with_location_to_request() {
378 let expr = AlterTableExpr {
379 catalog_name: String::default(),
380 schema_name: String::default(),
381 table_name: "monitor".to_string(),
382
383 kind: Some(Kind::AddColumns(AddColumns {
384 add_columns: vec![
385 AddColumn {
386 column_def: Some(ColumnDef {
387 name: "mem_usage".to_string(),
388 data_type: ColumnDataType::Float64 as i32,
389 is_nullable: false,
390 default_constraint: vec![],
391 semantic_type: SemanticType::Field as i32,
392 comment: String::new(),
393 ..Default::default()
394 }),
395 location: Some(Location {
396 location_type: LocationType::First.into(),
397 after_column_name: String::default(),
398 }),
399 add_if_not_exists: false,
400 },
401 AddColumn {
402 column_def: Some(ColumnDef {
403 name: "cpu_usage".to_string(),
404 data_type: ColumnDataType::Float64 as i32,
405 is_nullable: false,
406 default_constraint: vec![],
407 semantic_type: SemanticType::Field as i32,
408 comment: String::new(),
409 ..Default::default()
410 }),
411 location: Some(Location {
412 location_type: LocationType::After.into(),
413 after_column_name: "ts".to_string(),
414 }),
415 add_if_not_exists: true,
416 },
417 ],
418 })),
419 };
420
421 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
422 assert_eq!(alter_request.catalog_name, "");
423 assert_eq!(alter_request.schema_name, "");
424 assert_eq!("monitor".to_string(), alter_request.table_name);
425
426 let mut add_columns = match alter_request.alter_kind {
427 AlterKind::AddColumns { columns } => columns,
428 _ => unreachable!(),
429 };
430
431 let add_column = add_columns.pop().unwrap();
432 assert!(!add_column.is_key);
433 assert_eq!("cpu_usage", add_column.column_schema.name);
434 assert_eq!(
435 ConcreteDataType::float64_datatype(),
436 add_column.column_schema.data_type
437 );
438 assert_eq!(
439 Some(AddColumnLocation::After {
440 column_name: "ts".to_string()
441 }),
442 add_column.location
443 );
444 assert!(add_column.add_if_not_exists);
445
446 let add_column = add_columns.pop().unwrap();
447 assert!(!add_column.is_key);
448 assert_eq!("mem_usage", add_column.column_schema.name);
449 assert_eq!(
450 ConcreteDataType::float64_datatype(),
451 add_column.column_schema.data_type
452 );
453 assert_eq!(Some(AddColumnLocation::First), add_column.location);
454 assert!(!add_column.add_if_not_exists);
455 }
456
457 #[test]
458 fn test_modify_column_type_expr() {
459 let expr = AlterTableExpr {
460 catalog_name: "test_catalog".to_string(),
461 schema_name: "test_schema".to_string(),
462 table_name: "monitor".to_string(),
463
464 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
465 modify_column_types: vec![ModifyColumnType {
466 column_name: "mem_usage".to_string(),
467 target_type: ColumnDataType::String as i32,
468 target_type_extension: None,
469 }],
470 })),
471 };
472
473 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
474 assert_eq!(alter_request.catalog_name, "test_catalog");
475 assert_eq!(alter_request.schema_name, "test_schema");
476 assert_eq!("monitor".to_string(), alter_request.table_name);
477
478 let mut modify_column_types = match alter_request.alter_kind {
479 AlterKind::ModifyColumnTypes { columns } => columns,
480 _ => unreachable!(),
481 };
482
483 let modify_column_type = modify_column_types.pop().unwrap();
484 assert_eq!("mem_usage", modify_column_type.column_name);
485 assert_eq!(
486 ConcreteDataType::string_datatype(),
487 modify_column_type.target_type
488 );
489 }
490
491 #[test]
492 fn test_drop_column_expr() {
493 let expr = AlterTableExpr {
494 catalog_name: "test_catalog".to_string(),
495 schema_name: "test_schema".to_string(),
496 table_name: "monitor".to_string(),
497
498 kind: Some(Kind::DropColumns(DropColumns {
499 drop_columns: vec![DropColumn {
500 name: "mem_usage".to_string(),
501 }],
502 })),
503 };
504
505 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
506 assert_eq!(alter_request.catalog_name, "test_catalog");
507 assert_eq!(alter_request.schema_name, "test_schema");
508 assert_eq!("monitor".to_string(), alter_request.table_name);
509
510 let mut drop_names = match alter_request.alter_kind {
511 AlterKind::DropColumns { names } => names,
512 _ => unreachable!(),
513 };
514 assert_eq!(1, drop_names.len());
515 assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
516 }
517}