1use std::collections::HashSet;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::column_def::{contains_fulltext, contains_skipping};
19use api::v1::{
20 AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
21 ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
22};
23use datatypes::schema::Schema;
24use snafu::{OptionExt, ResultExt, ensure};
25use table::metadata::TableId;
26use table::table_reference::TableReference;
27
28use crate::error::{
29 self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
30 InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
31 UnknownColumnDataTypeSnafu,
32};
33pub struct ColumnExpr<'a> {
34 pub column_name: &'a str,
35 pub datatype: i32,
36 pub semantic_type: i32,
37 pub datatype_extension: &'a Option<ColumnDataTypeExtension>,
38 pub options: &'a Option<ColumnOptions>,
39}
40
41impl<'a> ColumnExpr<'a> {
42 #[inline]
43 pub fn from_columns(columns: &'a [Column]) -> Vec<Self> {
44 columns.iter().map(Self::from).collect()
45 }
46
47 #[inline]
48 pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec<Self> {
49 schemas.iter().map(Self::from).collect()
50 }
51}
52
53impl<'a> From<&'a Column> for ColumnExpr<'a> {
54 fn from(column: &'a Column) -> Self {
55 Self {
56 column_name: &column.column_name,
57 datatype: column.datatype,
58 semantic_type: column.semantic_type,
59 datatype_extension: &column.datatype_extension,
60 options: &column.options,
61 }
62 }
63}
64
65impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
66 fn from(schema: &'a ColumnSchema) -> Self {
67 Self {
68 column_name: &schema.column_name,
69 datatype: schema.datatype,
70 semantic_type: schema.semantic_type,
71 datatype_extension: &schema.datatype_extension,
72 options: &schema.options,
73 }
74 }
75}
76
77fn infer_column_datatype(
78 datatype: i32,
79 datatype_extension: &Option<ColumnDataTypeExtension>,
80) -> Result<ColumnDataType> {
81 let column_type =
82 ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;
83
84 if matches!(&column_type, ColumnDataType::Binary)
85 && let Some(ext) = datatype_extension
86 {
87 let type_ext = ext
88 .type_ext
89 .as_ref()
90 .context(error::MissingFieldSnafu { field: "type_ext" })?;
91 if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) {
92 return Ok(ColumnDataType::Json);
93 }
94 }
95
96 Ok(column_type)
97}
98
99pub fn build_create_table_expr(
100 table_id: Option<TableId>,
101 table_name: &TableReference<'_>,
102 column_exprs: Vec<ColumnExpr>,
103 engine: &str,
104 desc: &str,
105) -> Result<CreateTableExpr> {
106 let mut distinct_names = HashSet::with_capacity(column_exprs.len());
114 for ColumnExpr { column_name, .. } in &column_exprs {
115 ensure!(
116 distinct_names.insert(*column_name),
117 DuplicatedColumnNameSnafu { name: *column_name }
118 );
119 }
120
121 let mut column_defs = Vec::with_capacity(column_exprs.len());
122 let mut primary_keys = Vec::with_capacity(column_exprs.len());
123 let mut time_index = None;
124
125 for expr in column_exprs {
126 let ColumnExpr {
127 column_name,
128 datatype,
129 semantic_type,
130 datatype_extension,
131 options,
132 } = expr;
133
134 let mut is_nullable = true;
135 match semantic_type {
136 v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_owned()),
137 v if v == SemanticType::Timestamp as i32 => {
138 ensure!(
139 time_index.is_none(),
140 DuplicatedTimestampColumnSnafu {
141 exists: time_index.as_ref().unwrap(),
142 duplicated: column_name,
143 }
144 );
145 time_index = Some(column_name.to_owned());
146 is_nullable = false;
148 }
149 _ => {}
150 }
151
152 let column_type = infer_column_datatype(datatype, datatype_extension)?;
153
154 ensure!(
155 (!contains_fulltext(options) && !contains_skipping(options))
156 || column_type == ColumnDataType::String,
157 InvalidStringIndexColumnTypeSnafu {
158 column_name,
159 column_type,
160 }
161 );
162
163 column_defs.push(ColumnDef {
164 name: column_name.to_owned(),
165 data_type: datatype,
166 is_nullable,
167 default_constraint: vec![],
168 semantic_type,
169 comment: String::new(),
170 datatype_extension: *datatype_extension,
171 options: options.clone(),
172 });
173 }
174
175 let time_index = time_index.context(MissingTimestampColumnSnafu {
176 msg: format!("table is {}", table_name.table),
177 })?;
178
179 Ok(CreateTableExpr {
180 catalog_name: table_name.catalog.to_string(),
181 schema_name: table_name.schema.to_string(),
182 table_name: table_name.table.to_string(),
183 desc: desc.to_string(),
184 column_defs,
185 time_index,
186 primary_keys,
187 create_if_not_exists: true,
188 table_options: Default::default(),
189 table_id: table_id.map(|id| api::v1::TableId { id }),
190 engine: engine.to_string(),
191 })
192}
193
194pub fn extract_new_columns(
198 schema: &Schema,
199 column_exprs: Vec<ColumnExpr>,
200) -> Result<Option<AddColumns>> {
201 let columns_to_add = column_exprs
202 .into_iter()
203 .filter(|expr| schema.column_schema_by_name(expr.column_name).is_none())
204 .map(|expr| {
205 let column_def = Some(ColumnDef {
206 name: expr.column_name.to_string(),
207 data_type: expr.datatype,
208 is_nullable: true,
209 default_constraint: vec![],
210 semantic_type: expr.semantic_type,
211 comment: String::new(),
212 datatype_extension: *expr.datatype_extension,
213 options: expr.options.clone(),
214 });
215 AddColumn {
216 column_def,
217 location: None,
218 add_if_not_exists: true,
219 }
220 })
221 .collect::<Vec<_>>();
222
223 if columns_to_add.is_empty() {
224 Ok(None)
225 } else {
226 let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
227 for add_column in &columns_to_add {
228 let name = add_column.column_def.as_ref().unwrap().name.as_str();
229 ensure!(
230 distinct_names.insert(name),
231 DuplicatedColumnNameSnafu { name }
232 );
233 }
234
235 Ok(Some(AddColumns {
236 add_columns: columns_to_add,
237 }))
238 }
239}
240#[cfg(test)]
241mod tests {
242 use std::sync::Arc;
243 use std::{assert_eq, vec};
244
245 use api::helper::ColumnDataTypeWrapper;
246 use api::v1::column::Values;
247 use api::v1::column_data_type_extension::TypeExt;
248 use api::v1::{
249 Column, ColumnDataType, ColumnDataTypeExtension, Decimal128, DecimalTypeExtension,
250 IntervalMonthDayNano, SemanticType,
251 };
252 use common_catalog::consts::MITO_ENGINE;
253 use common_time::interval::IntervalUnit;
254 use common_time::timestamp::TimeUnit;
255 use datatypes::data_type::ConcreteDataType;
256 use datatypes::schema::{ColumnSchema, SchemaBuilder};
257 use snafu::ResultExt;
258
259 use super::*;
260 use crate::error;
261 use crate::error::ColumnDataTypeSnafu;
262
263 #[inline]
264 fn build_column_schema(
265 column_name: &str,
266 datatype: i32,
267 nullable: bool,
268 ) -> error::Result<ColumnSchema> {
269 let datatype_wrapper =
270 ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?;
271
272 Ok(ColumnSchema::new(
273 column_name,
274 datatype_wrapper.into(),
275 nullable,
276 ))
277 }
278
279 fn build_create_expr_from_insertion(
280 catalog_name: &str,
281 schema_name: &str,
282 table_id: Option<TableId>,
283 table_name: &str,
284 columns: &[Column],
285 engine: &str,
286 ) -> Result<CreateTableExpr> {
287 let table_name = TableReference::full(catalog_name, schema_name, table_name);
288 let column_exprs = ColumnExpr::from_columns(columns);
289 build_create_table_expr(
290 table_id,
291 &table_name,
292 column_exprs,
293 engine,
294 "Created on insertion",
295 )
296 }
297
298 #[test]
299 fn test_build_create_table_request() {
300 let table_id = Some(10);
301 let table_name = "test_metric";
302
303 assert!(
304 build_create_expr_from_insertion("", "", table_id, table_name, &[], MITO_ENGINE)
305 .is_err()
306 );
307
308 let insert_batch = mock_insert_batch();
309
310 let create_expr = build_create_expr_from_insertion(
311 "",
312 "",
313 table_id,
314 table_name,
315 &insert_batch.0,
316 MITO_ENGINE,
317 )
318 .unwrap();
319
320 assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
321 assert_eq!(table_name, create_expr.table_name);
322 assert_eq!("Created on insertion".to_string(), create_expr.desc);
323 assert_eq!(
324 vec![create_expr.column_defs[0].name.clone()],
325 create_expr.primary_keys
326 );
327
328 let column_defs = create_expr.column_defs;
329 assert_eq!(column_defs[5].name, create_expr.time_index);
330 assert_eq!(7, column_defs.len());
331
332 assert_eq!(
333 ConcreteDataType::string_datatype(),
334 ConcreteDataType::from(
335 ColumnDataTypeWrapper::try_new(
336 column_defs
337 .iter()
338 .find(|c| c.name == "host")
339 .unwrap()
340 .data_type,
341 None
342 )
343 .unwrap()
344 )
345 );
346
347 assert_eq!(
348 ConcreteDataType::float64_datatype(),
349 ConcreteDataType::from(
350 ColumnDataTypeWrapper::try_new(
351 column_defs
352 .iter()
353 .find(|c| c.name == "cpu")
354 .unwrap()
355 .data_type,
356 None
357 )
358 .unwrap()
359 )
360 );
361
362 assert_eq!(
363 ConcreteDataType::float64_datatype(),
364 ConcreteDataType::from(
365 ColumnDataTypeWrapper::try_new(
366 column_defs
367 .iter()
368 .find(|c| c.name == "memory")
369 .unwrap()
370 .data_type,
371 None
372 )
373 .unwrap()
374 )
375 );
376
377 assert_eq!(
378 ConcreteDataType::time_datatype(TimeUnit::Millisecond),
379 ConcreteDataType::from(
380 ColumnDataTypeWrapper::try_new(
381 column_defs
382 .iter()
383 .find(|c| c.name == "time")
384 .unwrap()
385 .data_type,
386 None
387 )
388 .unwrap()
389 )
390 );
391
392 assert_eq!(
393 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
394 ConcreteDataType::from(
395 ColumnDataTypeWrapper::try_new(
396 column_defs
397 .iter()
398 .find(|c| c.name == "interval")
399 .unwrap()
400 .data_type,
401 None
402 )
403 .unwrap()
404 )
405 );
406
407 assert_eq!(
408 ConcreteDataType::timestamp_millisecond_datatype(),
409 ConcreteDataType::from(
410 ColumnDataTypeWrapper::try_new(
411 column_defs
412 .iter()
413 .find(|c| c.name == "ts")
414 .unwrap()
415 .data_type,
416 None
417 )
418 .unwrap()
419 )
420 );
421
422 let decimal_column = column_defs.iter().find(|c| c.name == "decimals").unwrap();
423 assert_eq!(
424 ConcreteDataType::decimal128_datatype(38, 10),
425 ConcreteDataType::from(
426 ColumnDataTypeWrapper::try_new(
427 decimal_column.data_type,
428 decimal_column.datatype_extension,
429 )
430 .unwrap()
431 )
432 );
433 }
434
435 #[test]
436 fn test_find_new_columns() {
437 let mut columns = Vec::with_capacity(1);
438 let cpu_column = build_column_schema("cpu", 10, true).unwrap();
439 let ts_column = build_column_schema("ts", 15, false)
440 .unwrap()
441 .with_time_index(true);
442 columns.push(cpu_column);
443 columns.push(ts_column);
444
445 let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap());
446
447 assert!(
448 extract_new_columns(&schema, ColumnExpr::from_columns(&[]))
449 .unwrap()
450 .is_none()
451 );
452
453 let insert_batch = mock_insert_batch();
454
455 let add_columns = extract_new_columns(&schema, ColumnExpr::from_columns(&insert_batch.0))
456 .unwrap()
457 .unwrap();
458
459 assert_eq!(5, add_columns.add_columns.len());
460 let host_column = &add_columns.add_columns[0];
461 assert_eq!(
462 ConcreteDataType::string_datatype(),
463 ConcreteDataType::from(
464 ColumnDataTypeWrapper::try_new(
465 host_column.column_def.as_ref().unwrap().data_type,
466 None
467 )
468 .unwrap()
469 )
470 );
471 assert!(host_column.add_if_not_exists);
472
473 let memory_column = &add_columns.add_columns[1];
474 assert_eq!(
475 ConcreteDataType::float64_datatype(),
476 ConcreteDataType::from(
477 ColumnDataTypeWrapper::try_new(
478 memory_column.column_def.as_ref().unwrap().data_type,
479 None
480 )
481 .unwrap()
482 )
483 );
484 assert!(host_column.add_if_not_exists);
485
486 let time_column = &add_columns.add_columns[2];
487 assert_eq!(
488 ConcreteDataType::time_datatype(TimeUnit::Millisecond),
489 ConcreteDataType::from(
490 ColumnDataTypeWrapper::try_new(
491 time_column.column_def.as_ref().unwrap().data_type,
492 None
493 )
494 .unwrap()
495 )
496 );
497 assert!(host_column.add_if_not_exists);
498
499 let interval_column = &add_columns.add_columns[3];
500 assert_eq!(
501 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
502 ConcreteDataType::from(
503 ColumnDataTypeWrapper::try_new(
504 interval_column.column_def.as_ref().unwrap().data_type,
505 None
506 )
507 .unwrap()
508 )
509 );
510 assert!(host_column.add_if_not_exists);
511
512 let decimal_column = &add_columns.add_columns[4];
513 assert_eq!(
514 ConcreteDataType::decimal128_datatype(38, 10),
515 ConcreteDataType::from(
516 ColumnDataTypeWrapper::try_new(
517 decimal_column.column_def.as_ref().unwrap().data_type,
518 decimal_column
519 .column_def
520 .as_ref()
521 .unwrap()
522 .datatype_extension
523 )
524 .unwrap()
525 )
526 );
527 assert!(host_column.add_if_not_exists);
528 }
529
530 fn mock_insert_batch() -> (Vec<Column>, u32) {
531 let row_count = 2;
532
533 let host_vals = Values {
534 string_values: vec!["host1".to_string(), "host2".to_string()],
535 ..Default::default()
536 };
537 let host_column = Column {
538 column_name: "host".to_string(),
539 semantic_type: SemanticType::Tag as i32,
540 values: Some(host_vals),
541 null_mask: vec![0],
542 datatype: ColumnDataType::String as i32,
543 ..Default::default()
544 };
545
546 let cpu_vals = Values {
547 f64_values: vec![0.31],
548 ..Default::default()
549 };
550 let cpu_column = Column {
551 column_name: "cpu".to_string(),
552 semantic_type: SemanticType::Field as i32,
553 values: Some(cpu_vals),
554 null_mask: vec![2],
555 datatype: ColumnDataType::Float64 as i32,
556 ..Default::default()
557 };
558
559 let mem_vals = Values {
560 f64_values: vec![0.1],
561 ..Default::default()
562 };
563 let mem_column = Column {
564 column_name: "memory".to_string(),
565 semantic_type: SemanticType::Field as i32,
566 values: Some(mem_vals),
567 null_mask: vec![1],
568 datatype: ColumnDataType::Float64 as i32,
569 ..Default::default()
570 };
571
572 let time_vals = Values {
573 time_millisecond_values: vec![100, 101],
574 ..Default::default()
575 };
576 let time_column = Column {
577 column_name: "time".to_string(),
578 semantic_type: SemanticType::Field as i32,
579 values: Some(time_vals),
580 null_mask: vec![0],
581 datatype: ColumnDataType::TimeMillisecond as i32,
582 ..Default::default()
583 };
584
585 let interval1 = IntervalMonthDayNano {
586 months: 1,
587 days: 2,
588 nanoseconds: 3,
589 };
590 let interval2 = IntervalMonthDayNano {
591 months: 4,
592 days: 5,
593 nanoseconds: 6,
594 };
595 let interval_vals = Values {
596 interval_month_day_nano_values: vec![interval1, interval2],
597 ..Default::default()
598 };
599 let interval_column = Column {
600 column_name: "interval".to_string(),
601 semantic_type: SemanticType::Field as i32,
602 values: Some(interval_vals),
603 null_mask: vec![0],
604 datatype: ColumnDataType::IntervalMonthDayNano as i32,
605 ..Default::default()
606 };
607
608 let ts_vals = Values {
609 timestamp_millisecond_values: vec![100, 101],
610 ..Default::default()
611 };
612 let ts_column = Column {
613 column_name: "ts".to_string(),
614 semantic_type: SemanticType::Timestamp as i32,
615 values: Some(ts_vals),
616 null_mask: vec![0],
617 datatype: ColumnDataType::TimestampMillisecond as i32,
618 ..Default::default()
619 };
620 let decimal_vals = Values {
621 decimal128_values: vec![Decimal128 { hi: 0, lo: 123 }, Decimal128 { hi: 0, lo: 456 }],
622 ..Default::default()
623 };
624 let decimal_column = Column {
625 column_name: "decimals".to_string(),
626 semantic_type: SemanticType::Field as i32,
627 values: Some(decimal_vals),
628 null_mask: vec![0],
629 datatype: ColumnDataType::Decimal128 as i32,
630 datatype_extension: Some(ColumnDataTypeExtension {
631 type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension {
632 precision: 38,
633 scale: 10,
634 })),
635 }),
636 options: None,
637 };
638
639 (
640 vec![
641 host_column,
642 cpu_column,
643 mem_column,
644 time_column,
645 interval_column,
646 ts_column,
647 decimal_column,
648 ],
649 row_count,
650 )
651 }
652}