1use api::helper::ColumnDataTypeWrapper;
16use api::v1::add_column_location::LocationType;
17use api::v1::alter_table_expr::Kind;
18use api::v1::column_def::{
19 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
20};
21use api::v1::{
22 column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
23 DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
24 SemanticType, SkippingIndexType as PbSkippingIndexType,
25};
26use common_query::AddColumnLocation;
27use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
28use snafu::{ensure, OptionExt, ResultExt};
29use store_api::region_request::{SetRegionOption, UnsetRegionOption};
30use table::metadata::{TableId, TableMeta};
31use table::requests::{
32 AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest, SetDefaultRequest,
33 SetIndexOption, UnsetIndexOption,
34};
35
36use crate::error::{
37 ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
38 InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
39 InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
40 MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
41 MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
42};
43
44const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
45const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
46
47fn set_index_option_from_proto(set_index: api::v1::SetIndex) -> Result<SetIndexOption> {
48 let options = set_index.options.context(MissingAlterIndexOptionSnafu)?;
49 Ok(match options {
50 api::v1::set_index::Options::Fulltext(f) => SetIndexOption::Fulltext {
51 column_name: f.column_name.clone(),
52 options: FulltextOptions::new(
53 f.enable,
54 as_fulltext_option_analyzer(
55 Analyzer::try_from(f.analyzer).context(InvalidSetFulltextOptionRequestSnafu)?,
56 ),
57 f.case_sensitive,
58 as_fulltext_option_backend(
59 PbFulltextBackend::try_from(f.backend)
60 .context(InvalidSetFulltextOptionRequestSnafu)?,
61 ),
62 f.granularity as u32,
63 f.false_positive_rate,
64 )
65 .context(InvalidIndexOptionSnafu)?,
66 },
67 api::v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
68 column_name: i.column_name,
69 },
70 api::v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
71 column_name: s.column_name,
72 options: SkippingIndexOptions::new(
73 s.granularity as u32,
74 s.false_positive_rate,
75 as_skipping_index_type(
76 PbSkippingIndexType::try_from(s.skipping_index_type)
77 .context(InvalidSetSkippingIndexOptionRequestSnafu)?,
78 ),
79 )
80 .context(InvalidIndexOptionSnafu)?,
81 },
82 })
83}
84
85fn unset_index_option_from_proto(unset_index: api::v1::UnsetIndex) -> Result<UnsetIndexOption> {
86 let options = unset_index.options.context(MissingAlterIndexOptionSnafu)?;
87 Ok(match options {
88 api::v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
89 column_name: f.column_name,
90 },
91 api::v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
92 column_name: i.column_name,
93 },
94 api::v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
95 column_name: s.column_name,
96 },
97 })
98}
99
100pub fn alter_expr_to_request(
104 table_id: TableId,
105 expr: AlterTableExpr,
106 table_meta: Option<&TableMeta>,
107) -> Result<AlterTableRequest> {
108 let catalog_name = expr.catalog_name;
109 let schema_name = expr.schema_name;
110 let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
111 let alter_kind = match kind {
112 Kind::AddColumns(add_columns) => {
113 let add_column_requests = add_columns
114 .add_columns
115 .into_iter()
116 .map(|ac| {
117 let column_def = ac.column_def.context(MissingFieldSnafu {
118 field: "column_def",
119 })?;
120
121 let schema = column_def::try_as_column_schema(&column_def).context(
122 InvalidColumnDefSnafu {
123 column: &column_def.name,
124 },
125 )?;
126 Ok(AddColumnRequest {
127 column_schema: schema,
128 is_key: column_def.semantic_type == SemanticType::Tag as i32,
129 location: parse_location(ac.location)?,
130 add_if_not_exists: ac.add_if_not_exists,
131 })
132 })
133 .collect::<Result<Vec<_>>>()?;
134
135 AlterKind::AddColumns {
136 columns: add_column_requests,
137 }
138 }
139 Kind::ModifyColumnTypes(ModifyColumnTypes {
140 modify_column_types,
141 }) => {
142 let modify_column_type_requests = modify_column_types
143 .into_iter()
144 .map(|cct| {
145 let target_type =
146 ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
147 .into();
148
149 Ok(ModifyColumnTypeRequest {
150 column_name: cct.column_name,
151 target_type,
152 })
153 })
154 .collect::<Result<Vec<_>>>()?;
155
156 AlterKind::ModifyColumnTypes {
157 columns: modify_column_type_requests,
158 }
159 }
160 Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
161 names: drop_columns.into_iter().map(|c| c.name).collect(),
162 },
163 Kind::RenameTable(RenameTable { new_table_name }) => {
164 AlterKind::RenameTable { new_table_name }
165 }
166 Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
167 AlterKind::SetTableOptions {
168 options: table_options
169 .iter()
170 .map(SetRegionOption::try_from)
171 .collect::<std::result::Result<Vec<_>, _>>()
172 .context(InvalidSetTableOptionRequestSnafu)?,
173 }
174 }
175 Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
176 AlterKind::UnsetTableOptions {
177 keys: keys
178 .iter()
179 .map(|key| UnsetRegionOption::try_from(key.as_str()))
180 .collect::<std::result::Result<Vec<_>, _>>()
181 .context(InvalidUnsetTableOptionRequestSnafu)?,
182 }
183 }
184 Kind::SetIndex(o) => {
185 let option = set_index_option_from_proto(o)?;
186 AlterKind::SetIndexes {
187 options: vec![option],
188 }
189 }
190 Kind::UnsetIndex(o) => {
191 let option = unset_index_option_from_proto(o)?;
192 AlterKind::UnsetIndexes {
193 options: vec![option],
194 }
195 }
196 Kind::SetIndexes(o) => {
197 let options = o
198 .set_indexes
199 .into_iter()
200 .map(set_index_option_from_proto)
201 .collect::<Result<Vec<_>>>()?;
202 AlterKind::SetIndexes { options }
203 }
204 Kind::UnsetIndexes(o) => {
205 let options = o
206 .unset_indexes
207 .into_iter()
208 .map(unset_index_option_from_proto)
209 .collect::<Result<Vec<_>>>()?;
210 AlterKind::UnsetIndexes { options }
211 }
212 Kind::DropDefaults(o) => {
213 let names = o
214 .drop_defaults
215 .into_iter()
216 .map(|col| {
217 ensure!(
218 !col.column_name.is_empty(),
219 MissingFieldSnafu {
220 field: "column_name"
221 }
222 );
223 Ok(col.column_name)
224 })
225 .collect::<Result<Vec<_>>>()?;
226 AlterKind::DropDefaults { names }
227 }
228 Kind::SetDefaults(o) => {
229 let table_meta = table_meta.context(MissingTableMetaSnafu { table_id })?;
230 let defaults = o
231 .set_defaults
232 .into_iter()
233 .map(|col| {
234 let column_scheme = table_meta
235 .schema
236 .column_schema_by_name(&col.column_name)
237 .context(ColumnNotFoundSnafu {
238 column_name: &col.column_name,
239 })?;
240 let default_constraint = common_sql::convert::deserialize_default_constraint(
241 col.default_constraint.as_slice(),
242 &col.column_name,
243 &column_scheme.data_type,
244 )
245 .context(crate::error::SqlCommonSnafu)?;
246 Ok(SetDefaultRequest {
247 column_name: col.column_name,
248 default_constraint,
249 })
250 })
251 .collect::<Result<Vec<_>>>()?;
252 AlterKind::SetDefaults { defaults }
253 }
254 };
255
256 let request = AlterTableRequest {
257 catalog_name,
258 schema_name,
259 table_name: expr.table_name,
260 table_id,
261 alter_kind,
262 table_version: None,
263 };
264 Ok(request)
265}
266
267pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {
268 let column_schemas = expr
269 .column_defs
270 .iter()
271 .map(|x| {
272 column_def::try_as_column_schema(x).context(InvalidColumnDefSnafu { column: &x.name })
273 })
274 .collect::<Result<Vec<ColumnSchema>>>()?;
275
276 if require_time_index {
278 ensure!(
279 column_schemas
280 .iter()
281 .any(|column| column.name == expr.time_index),
282 MissingTimestampColumnSnafu {
283 msg: format!("CreateExpr: {expr:?}")
284 }
285 );
286 }
287
288 let column_schemas = column_schemas
289 .into_iter()
290 .map(|column_schema| {
291 if column_schema.name == expr.time_index {
292 column_schema.with_time_index(true)
293 } else {
294 column_schema
295 }
296 })
297 .collect::<Vec<_>>();
298
299 Ok(RawSchema::new(column_schemas))
300}
301
302fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
303 match location {
304 Some(Location {
305 location_type: LOCATION_TYPE_FIRST,
306 ..
307 }) => Ok(Some(AddColumnLocation::First)),
308 Some(Location {
309 location_type: LOCATION_TYPE_AFTER,
310 after_column_name,
311 }) => Ok(Some(AddColumnLocation::After {
312 column_name: after_column_name,
313 })),
314 Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
315 None => Ok(None),
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use api::v1::{
322 AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
323 SemanticType,
324 };
325 use datatypes::prelude::ConcreteDataType;
326
327 use super::*;
328
329 #[test]
330 fn test_alter_expr_to_request() {
331 let expr = AlterTableExpr {
332 catalog_name: String::default(),
333 schema_name: String::default(),
334 table_name: "monitor".to_string(),
335
336 kind: Some(Kind::AddColumns(AddColumns {
337 add_columns: vec![AddColumn {
338 column_def: Some(ColumnDef {
339 name: "mem_usage".to_string(),
340 data_type: ColumnDataType::Float64 as i32,
341 is_nullable: false,
342 default_constraint: vec![],
343 semantic_type: SemanticType::Field as i32,
344 comment: String::new(),
345 ..Default::default()
346 }),
347 location: None,
348 add_if_not_exists: true,
349 }],
350 })),
351 };
352
353 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
354 assert_eq!(alter_request.catalog_name, "");
355 assert_eq!(alter_request.schema_name, "");
356 assert_eq!("monitor".to_string(), alter_request.table_name);
357 let add_column = match alter_request.alter_kind {
358 AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
359 _ => unreachable!(),
360 };
361
362 assert!(!add_column.is_key);
363 assert_eq!("mem_usage", add_column.column_schema.name);
364 assert_eq!(
365 ConcreteDataType::float64_datatype(),
366 add_column.column_schema.data_type
367 );
368 assert_eq!(None, add_column.location);
369 assert!(add_column.add_if_not_exists);
370 }
371
372 #[test]
373 fn test_alter_expr_with_location_to_request() {
374 let expr = AlterTableExpr {
375 catalog_name: String::default(),
376 schema_name: String::default(),
377 table_name: "monitor".to_string(),
378
379 kind: Some(Kind::AddColumns(AddColumns {
380 add_columns: vec![
381 AddColumn {
382 column_def: Some(ColumnDef {
383 name: "mem_usage".to_string(),
384 data_type: ColumnDataType::Float64 as i32,
385 is_nullable: false,
386 default_constraint: vec![],
387 semantic_type: SemanticType::Field as i32,
388 comment: String::new(),
389 ..Default::default()
390 }),
391 location: Some(Location {
392 location_type: LocationType::First.into(),
393 after_column_name: String::default(),
394 }),
395 add_if_not_exists: false,
396 },
397 AddColumn {
398 column_def: Some(ColumnDef {
399 name: "cpu_usage".to_string(),
400 data_type: ColumnDataType::Float64 as i32,
401 is_nullable: false,
402 default_constraint: vec![],
403 semantic_type: SemanticType::Field as i32,
404 comment: String::new(),
405 ..Default::default()
406 }),
407 location: Some(Location {
408 location_type: LocationType::After.into(),
409 after_column_name: "ts".to_string(),
410 }),
411 add_if_not_exists: true,
412 },
413 ],
414 })),
415 };
416
417 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
418 assert_eq!(alter_request.catalog_name, "");
419 assert_eq!(alter_request.schema_name, "");
420 assert_eq!("monitor".to_string(), alter_request.table_name);
421
422 let mut add_columns = match alter_request.alter_kind {
423 AlterKind::AddColumns { columns } => columns,
424 _ => unreachable!(),
425 };
426
427 let add_column = add_columns.pop().unwrap();
428 assert!(!add_column.is_key);
429 assert_eq!("cpu_usage", add_column.column_schema.name);
430 assert_eq!(
431 ConcreteDataType::float64_datatype(),
432 add_column.column_schema.data_type
433 );
434 assert_eq!(
435 Some(AddColumnLocation::After {
436 column_name: "ts".to_string()
437 }),
438 add_column.location
439 );
440 assert!(add_column.add_if_not_exists);
441
442 let add_column = add_columns.pop().unwrap();
443 assert!(!add_column.is_key);
444 assert_eq!("mem_usage", add_column.column_schema.name);
445 assert_eq!(
446 ConcreteDataType::float64_datatype(),
447 add_column.column_schema.data_type
448 );
449 assert_eq!(Some(AddColumnLocation::First), add_column.location);
450 assert!(!add_column.add_if_not_exists);
451 }
452
453 #[test]
454 fn test_modify_column_type_expr() {
455 let expr = AlterTableExpr {
456 catalog_name: "test_catalog".to_string(),
457 schema_name: "test_schema".to_string(),
458 table_name: "monitor".to_string(),
459
460 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
461 modify_column_types: vec![ModifyColumnType {
462 column_name: "mem_usage".to_string(),
463 target_type: ColumnDataType::String as i32,
464 target_type_extension: None,
465 }],
466 })),
467 };
468
469 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
470 assert_eq!(alter_request.catalog_name, "test_catalog");
471 assert_eq!(alter_request.schema_name, "test_schema");
472 assert_eq!("monitor".to_string(), alter_request.table_name);
473
474 let mut modify_column_types = match alter_request.alter_kind {
475 AlterKind::ModifyColumnTypes { columns } => columns,
476 _ => unreachable!(),
477 };
478
479 let modify_column_type = modify_column_types.pop().unwrap();
480 assert_eq!("mem_usage", modify_column_type.column_name);
481 assert_eq!(
482 ConcreteDataType::string_datatype(),
483 modify_column_type.target_type
484 );
485 }
486
487 #[test]
488 fn test_drop_column_expr() {
489 let expr = AlterTableExpr {
490 catalog_name: "test_catalog".to_string(),
491 schema_name: "test_schema".to_string(),
492 table_name: "monitor".to_string(),
493
494 kind: Some(Kind::DropColumns(DropColumns {
495 drop_columns: vec![DropColumn {
496 name: "mem_usage".to_string(),
497 }],
498 })),
499 };
500
501 let alter_request = alter_expr_to_request(1, expr, None).unwrap();
502 assert_eq!(alter_request.catalog_name, "test_catalog");
503 assert_eq!(alter_request.schema_name, "test_schema");
504 assert_eq!("monitor".to_string(), alter_request.table_name);
505
506 let mut drop_names = match alter_request.alter_kind {
507 AlterKind::DropColumns { names } => names,
508 _ => unreachable!(),
509 };
510 assert_eq!(1, drop_names.len());
511 assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
512 }
513}