1use api::helper::ColumnDataTypeWrapper;
16use api::v1::add_column_location::LocationType;
17use api::v1::alter_table_expr::Kind;
18use api::v1::column_def::{
19 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
20};
21use api::v1::{
22 column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
23 DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
24 SemanticType, SkippingIndexType as PbSkippingIndexType,
25};
26use common_query::AddColumnLocation;
27use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
28use snafu::{ensure, OptionExt, ResultExt};
29use store_api::region_request::{SetRegionOption, UnsetRegionOption};
30use table::metadata::TableId;
31use table::requests::{
32 AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest, SetIndexOptions,
33 UnsetIndexOptions,
34};
35
36use crate::error::{
37 InvalidColumnDefSnafu, InvalidSetFulltextOptionRequestSnafu,
38 InvalidSetSkippingIndexOptionRequestSnafu, InvalidSetTableOptionRequestSnafu,
39 InvalidUnsetTableOptionRequestSnafu, MissingAlterIndexOptionSnafu, MissingFieldSnafu,
40 MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
41};
42
43const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
44const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
45
46pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
48 let catalog_name = expr.catalog_name;
49 let schema_name = expr.schema_name;
50 let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
51 let alter_kind = match kind {
52 Kind::AddColumns(add_columns) => {
53 let add_column_requests = add_columns
54 .add_columns
55 .into_iter()
56 .map(|ac| {
57 let column_def = ac.column_def.context(MissingFieldSnafu {
58 field: "column_def",
59 })?;
60
61 let schema = column_def::try_as_column_schema(&column_def).context(
62 InvalidColumnDefSnafu {
63 column: &column_def.name,
64 },
65 )?;
66 Ok(AddColumnRequest {
67 column_schema: schema,
68 is_key: column_def.semantic_type == SemanticType::Tag as i32,
69 location: parse_location(ac.location)?,
70 add_if_not_exists: ac.add_if_not_exists,
71 })
72 })
73 .collect::<Result<Vec<_>>>()?;
74
75 AlterKind::AddColumns {
76 columns: add_column_requests,
77 }
78 }
79 Kind::ModifyColumnTypes(ModifyColumnTypes {
80 modify_column_types,
81 }) => {
82 let modify_column_type_requests = modify_column_types
83 .into_iter()
84 .map(|cct| {
85 let target_type =
86 ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
87 .into();
88
89 Ok(ModifyColumnTypeRequest {
90 column_name: cct.column_name,
91 target_type,
92 })
93 })
94 .collect::<Result<Vec<_>>>()?;
95
96 AlterKind::ModifyColumnTypes {
97 columns: modify_column_type_requests,
98 }
99 }
100 Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
101 names: drop_columns.into_iter().map(|c| c.name).collect(),
102 },
103 Kind::RenameTable(RenameTable { new_table_name }) => {
104 AlterKind::RenameTable { new_table_name }
105 }
106 Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
107 AlterKind::SetTableOptions {
108 options: table_options
109 .iter()
110 .map(SetRegionOption::try_from)
111 .collect::<std::result::Result<Vec<_>, _>>()
112 .context(InvalidSetTableOptionRequestSnafu)?,
113 }
114 }
115 Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
116 AlterKind::UnsetTableOptions {
117 keys: keys
118 .iter()
119 .map(|key| UnsetRegionOption::try_from(key.as_str()))
120 .collect::<std::result::Result<Vec<_>, _>>()
121 .context(InvalidUnsetTableOptionRequestSnafu)?,
122 }
123 }
124 Kind::SetIndex(o) => match o.options {
125 Some(opt) => match opt {
126 api::v1::set_index::Options::Fulltext(f) => AlterKind::SetIndex {
127 options: SetIndexOptions::Fulltext {
128 column_name: f.column_name.clone(),
129 options: FulltextOptions {
130 enable: f.enable,
131 analyzer: as_fulltext_option_analyzer(
132 Analyzer::try_from(f.analyzer)
133 .context(InvalidSetFulltextOptionRequestSnafu)?,
134 ),
135 case_sensitive: f.case_sensitive,
136 backend: as_fulltext_option_backend(
137 PbFulltextBackend::try_from(f.backend)
138 .context(InvalidSetFulltextOptionRequestSnafu)?,
139 ),
140 },
141 },
142 },
143 api::v1::set_index::Options::Inverted(i) => AlterKind::SetIndex {
144 options: SetIndexOptions::Inverted {
145 column_name: i.column_name,
146 },
147 },
148 api::v1::set_index::Options::Skipping(s) => AlterKind::SetIndex {
149 options: SetIndexOptions::Skipping {
150 column_name: s.column_name,
151 options: SkippingIndexOptions {
152 granularity: s.granularity as u32,
153 index_type: as_skipping_index_type(
154 PbSkippingIndexType::try_from(s.skipping_index_type)
155 .context(InvalidSetSkippingIndexOptionRequestSnafu)?,
156 ),
157 },
158 },
159 },
160 },
161 None => return MissingAlterIndexOptionSnafu.fail(),
162 },
163 Kind::UnsetIndex(o) => match o.options {
164 Some(opt) => match opt {
165 api::v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
166 options: UnsetIndexOptions::Fulltext {
167 column_name: f.column_name,
168 },
169 },
170 api::v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
171 options: UnsetIndexOptions::Inverted {
172 column_name: i.column_name,
173 },
174 },
175 api::v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
176 options: UnsetIndexOptions::Skipping {
177 column_name: s.column_name,
178 },
179 },
180 },
181 None => return MissingAlterIndexOptionSnafu.fail(),
182 },
183 Kind::DropDefaults(o) => {
184 let names = o
185 .drop_defaults
186 .into_iter()
187 .map(|col| {
188 ensure!(
189 !col.column_name.is_empty(),
190 MissingFieldSnafu {
191 field: "column_name"
192 }
193 );
194 Ok(col.column_name)
195 })
196 .collect::<Result<Vec<_>>>()?;
197 AlterKind::DropDefaults { names }
198 }
199 };
200
201 let request = AlterTableRequest {
202 catalog_name,
203 schema_name,
204 table_name: expr.table_name,
205 table_id,
206 alter_kind,
207 table_version: None,
208 };
209 Ok(request)
210}
211
212pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Result<RawSchema> {
213 let column_schemas = expr
214 .column_defs
215 .iter()
216 .map(|x| {
217 column_def::try_as_column_schema(x).context(InvalidColumnDefSnafu { column: &x.name })
218 })
219 .collect::<Result<Vec<ColumnSchema>>>()?;
220
221 if require_time_index {
223 ensure!(
224 column_schemas
225 .iter()
226 .any(|column| column.name == expr.time_index),
227 MissingTimestampColumnSnafu {
228 msg: format!("CreateExpr: {expr:?}")
229 }
230 );
231 }
232
233 let column_schemas = column_schemas
234 .into_iter()
235 .map(|column_schema| {
236 if column_schema.name == expr.time_index {
237 column_schema.with_time_index(true)
238 } else {
239 column_schema
240 }
241 })
242 .collect::<Vec<_>>();
243
244 Ok(RawSchema::new(column_schemas))
245}
246
247fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
248 match location {
249 Some(Location {
250 location_type: LOCATION_TYPE_FIRST,
251 ..
252 }) => Ok(Some(AddColumnLocation::First)),
253 Some(Location {
254 location_type: LOCATION_TYPE_AFTER,
255 after_column_name,
256 }) => Ok(Some(AddColumnLocation::After {
257 column_name: after_column_name,
258 })),
259 Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
260 None => Ok(None),
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use api::v1::{
267 AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
268 SemanticType,
269 };
270 use datatypes::prelude::ConcreteDataType;
271
272 use super::*;
273
274 #[test]
275 fn test_alter_expr_to_request() {
276 let expr = AlterTableExpr {
277 catalog_name: String::default(),
278 schema_name: String::default(),
279 table_name: "monitor".to_string(),
280
281 kind: Some(Kind::AddColumns(AddColumns {
282 add_columns: vec![AddColumn {
283 column_def: Some(ColumnDef {
284 name: "mem_usage".to_string(),
285 data_type: ColumnDataType::Float64 as i32,
286 is_nullable: false,
287 default_constraint: vec![],
288 semantic_type: SemanticType::Field as i32,
289 comment: String::new(),
290 ..Default::default()
291 }),
292 location: None,
293 add_if_not_exists: true,
294 }],
295 })),
296 };
297
298 let alter_request = alter_expr_to_request(1, expr).unwrap();
299 assert_eq!(alter_request.catalog_name, "");
300 assert_eq!(alter_request.schema_name, "");
301 assert_eq!("monitor".to_string(), alter_request.table_name);
302 let add_column = match alter_request.alter_kind {
303 AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
304 _ => unreachable!(),
305 };
306
307 assert!(!add_column.is_key);
308 assert_eq!("mem_usage", add_column.column_schema.name);
309 assert_eq!(
310 ConcreteDataType::float64_datatype(),
311 add_column.column_schema.data_type
312 );
313 assert_eq!(None, add_column.location);
314 assert!(add_column.add_if_not_exists);
315 }
316
317 #[test]
318 fn test_alter_expr_with_location_to_request() {
319 let expr = AlterTableExpr {
320 catalog_name: String::default(),
321 schema_name: String::default(),
322 table_name: "monitor".to_string(),
323
324 kind: Some(Kind::AddColumns(AddColumns {
325 add_columns: vec![
326 AddColumn {
327 column_def: Some(ColumnDef {
328 name: "mem_usage".to_string(),
329 data_type: ColumnDataType::Float64 as i32,
330 is_nullable: false,
331 default_constraint: vec![],
332 semantic_type: SemanticType::Field as i32,
333 comment: String::new(),
334 ..Default::default()
335 }),
336 location: Some(Location {
337 location_type: LocationType::First.into(),
338 after_column_name: String::default(),
339 }),
340 add_if_not_exists: false,
341 },
342 AddColumn {
343 column_def: Some(ColumnDef {
344 name: "cpu_usage".to_string(),
345 data_type: ColumnDataType::Float64 as i32,
346 is_nullable: false,
347 default_constraint: vec![],
348 semantic_type: SemanticType::Field as i32,
349 comment: String::new(),
350 ..Default::default()
351 }),
352 location: Some(Location {
353 location_type: LocationType::After.into(),
354 after_column_name: "ts".to_string(),
355 }),
356 add_if_not_exists: true,
357 },
358 ],
359 })),
360 };
361
362 let alter_request = alter_expr_to_request(1, expr).unwrap();
363 assert_eq!(alter_request.catalog_name, "");
364 assert_eq!(alter_request.schema_name, "");
365 assert_eq!("monitor".to_string(), alter_request.table_name);
366
367 let mut add_columns = match alter_request.alter_kind {
368 AlterKind::AddColumns { columns } => columns,
369 _ => unreachable!(),
370 };
371
372 let add_column = add_columns.pop().unwrap();
373 assert!(!add_column.is_key);
374 assert_eq!("cpu_usage", add_column.column_schema.name);
375 assert_eq!(
376 ConcreteDataType::float64_datatype(),
377 add_column.column_schema.data_type
378 );
379 assert_eq!(
380 Some(AddColumnLocation::After {
381 column_name: "ts".to_string()
382 }),
383 add_column.location
384 );
385 assert!(add_column.add_if_not_exists);
386
387 let add_column = add_columns.pop().unwrap();
388 assert!(!add_column.is_key);
389 assert_eq!("mem_usage", add_column.column_schema.name);
390 assert_eq!(
391 ConcreteDataType::float64_datatype(),
392 add_column.column_schema.data_type
393 );
394 assert_eq!(Some(AddColumnLocation::First), add_column.location);
395 assert!(!add_column.add_if_not_exists);
396 }
397
398 #[test]
399 fn test_modify_column_type_expr() {
400 let expr = AlterTableExpr {
401 catalog_name: "test_catalog".to_string(),
402 schema_name: "test_schema".to_string(),
403 table_name: "monitor".to_string(),
404
405 kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
406 modify_column_types: vec![ModifyColumnType {
407 column_name: "mem_usage".to_string(),
408 target_type: ColumnDataType::String as i32,
409 target_type_extension: None,
410 }],
411 })),
412 };
413
414 let alter_request = alter_expr_to_request(1, expr).unwrap();
415 assert_eq!(alter_request.catalog_name, "test_catalog");
416 assert_eq!(alter_request.schema_name, "test_schema");
417 assert_eq!("monitor".to_string(), alter_request.table_name);
418
419 let mut modify_column_types = match alter_request.alter_kind {
420 AlterKind::ModifyColumnTypes { columns } => columns,
421 _ => unreachable!(),
422 };
423
424 let modify_column_type = modify_column_types.pop().unwrap();
425 assert_eq!("mem_usage", modify_column_type.column_name);
426 assert_eq!(
427 ConcreteDataType::string_datatype(),
428 modify_column_type.target_type
429 );
430 }
431
432 #[test]
433 fn test_drop_column_expr() {
434 let expr = AlterTableExpr {
435 catalog_name: "test_catalog".to_string(),
436 schema_name: "test_schema".to_string(),
437 table_name: "monitor".to_string(),
438
439 kind: Some(Kind::DropColumns(DropColumns {
440 drop_columns: vec![DropColumn {
441 name: "mem_usage".to_string(),
442 }],
443 })),
444 };
445
446 let alter_request = alter_expr_to_request(1, expr).unwrap();
447 assert_eq!(alter_request.catalog_name, "test_catalog");
448 assert_eq!(alter_request.schema_name, "test_schema");
449 assert_eq!("monitor".to_string(), alter_request.table_name);
450
451 let mut drop_names = match alter_request.alter_kind {
452 AlterKind::DropColumns { names } => names,
453 _ => unreachable!(),
454 };
455 assert_eq!(1, drop_names.len());
456 assert_eq!("mem_usage".to_string(), drop_names.pop().unwrap());
457 }
458}