1use std::collections::HashSet;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::column_def::{contains_fulltext, contains_skipping};
19use api::v1::{
20 AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
21 ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
22};
23use datatypes::schema::Schema;
24use snafu::{ensure, OptionExt, ResultExt};
25use table::metadata::TableId;
26use table::table_reference::TableReference;
27
28use crate::error::{
29 self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
30 InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
31 UnknownColumnDataTypeSnafu,
32};
33pub struct ColumnExpr<'a> {
34 pub column_name: &'a str,
35 pub datatype: i32,
36 pub semantic_type: i32,
37 pub datatype_extension: &'a Option<ColumnDataTypeExtension>,
38 pub options: &'a Option<ColumnOptions>,
39}
40
41impl<'a> ColumnExpr<'a> {
42 #[inline]
43 pub fn from_columns(columns: &'a [Column]) -> Vec<Self> {
44 columns.iter().map(Self::from).collect()
45 }
46
47 #[inline]
48 pub fn from_column_schemas(schemas: &'a [ColumnSchema]) -> Vec<Self> {
49 schemas.iter().map(Self::from).collect()
50 }
51}
52
53impl<'a> From<&'a Column> for ColumnExpr<'a> {
54 fn from(column: &'a Column) -> Self {
55 Self {
56 column_name: &column.column_name,
57 datatype: column.datatype,
58 semantic_type: column.semantic_type,
59 datatype_extension: &column.datatype_extension,
60 options: &column.options,
61 }
62 }
63}
64
65impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> {
66 fn from(schema: &'a ColumnSchema) -> Self {
67 Self {
68 column_name: &schema.column_name,
69 datatype: schema.datatype,
70 semantic_type: schema.semantic_type,
71 datatype_extension: &schema.datatype_extension,
72 options: &schema.options,
73 }
74 }
75}
76
77fn infer_column_datatype(
78 datatype: i32,
79 datatype_extension: &Option<ColumnDataTypeExtension>,
80) -> Result<ColumnDataType> {
81 let column_type =
82 ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?;
83
84 if matches!(&column_type, ColumnDataType::Binary) {
85 if let Some(ext) = datatype_extension {
86 let type_ext = ext
87 .type_ext
88 .as_ref()
89 .context(error::MissingFieldSnafu { field: "type_ext" })?;
90 if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) {
91 return Ok(ColumnDataType::Json);
92 }
93 }
94 }
95
96 Ok(column_type)
97}
98
99pub fn build_create_table_expr(
100 table_id: Option<TableId>,
101 table_name: &TableReference<'_>,
102 column_exprs: Vec<ColumnExpr>,
103 engine: &str,
104 desc: &str,
105) -> Result<CreateTableExpr> {
106 let mut distinct_names = HashSet::with_capacity(column_exprs.len());
114 for ColumnExpr { column_name, .. } in &column_exprs {
115 ensure!(
116 distinct_names.insert(*column_name),
117 DuplicatedColumnNameSnafu { name: *column_name }
118 );
119 }
120
121 let mut column_defs = Vec::with_capacity(column_exprs.len());
122 let mut primary_keys = Vec::with_capacity(column_exprs.len());
123 let mut time_index = None;
124
125 for expr in column_exprs {
126 let ColumnExpr {
127 column_name,
128 datatype,
129 semantic_type,
130 datatype_extension,
131 options,
132 } = expr;
133
134 let mut is_nullable = true;
135 match semantic_type {
136 v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.to_owned()),
137 v if v == SemanticType::Timestamp as i32 => {
138 ensure!(
139 time_index.is_none(),
140 DuplicatedTimestampColumnSnafu {
141 exists: time_index.as_ref().unwrap(),
142 duplicated: column_name,
143 }
144 );
145 time_index = Some(column_name.to_owned());
146 is_nullable = false;
148 }
149 _ => {}
150 }
151
152 let column_type = infer_column_datatype(datatype, datatype_extension)?;
153
154 ensure!(
155 (!contains_fulltext(options) && !contains_skipping(options))
156 || column_type == ColumnDataType::String,
157 InvalidStringIndexColumnTypeSnafu {
158 column_name,
159 column_type,
160 }
161 );
162
163 column_defs.push(ColumnDef {
164 name: column_name.to_owned(),
165 data_type: datatype,
166 is_nullable,
167 default_constraint: vec![],
168 semantic_type,
169 comment: String::new(),
170 datatype_extension: *datatype_extension,
171 options: options.clone(),
172 });
173 }
174
175 let time_index = time_index.context(MissingTimestampColumnSnafu {
176 msg: format!("table is {}", table_name.table),
177 })?;
178
179 Ok(CreateTableExpr {
180 catalog_name: table_name.catalog.to_string(),
181 schema_name: table_name.schema.to_string(),
182 table_name: table_name.table.to_string(),
183 desc: desc.to_string(),
184 column_defs,
185 time_index,
186 primary_keys,
187 create_if_not_exists: true,
188 table_options: Default::default(),
189 table_id: table_id.map(|id| api::v1::TableId { id }),
190 engine: engine.to_string(),
191 })
192}
193
194pub fn extract_new_columns(
198 schema: &Schema,
199 column_exprs: Vec<ColumnExpr>,
200) -> Result<Option<AddColumns>> {
201 let columns_to_add = column_exprs
202 .into_iter()
203 .filter(|expr| schema.column_schema_by_name(expr.column_name).is_none())
204 .map(|expr| {
205 let column_def = Some(ColumnDef {
206 name: expr.column_name.to_string(),
207 data_type: expr.datatype,
208 is_nullable: true,
209 default_constraint: vec![],
210 semantic_type: expr.semantic_type,
211 comment: String::new(),
212 datatype_extension: *expr.datatype_extension,
213 options: expr.options.clone(),
214 });
215 AddColumn {
216 column_def,
217 location: None,
218 add_if_not_exists: true,
219 }
220 })
221 .collect::<Vec<_>>();
222
223 if columns_to_add.is_empty() {
224 Ok(None)
225 } else {
226 let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
227 for add_column in &columns_to_add {
228 let name = add_column.column_def.as_ref().unwrap().name.as_str();
229 ensure!(
230 distinct_names.insert(name),
231 DuplicatedColumnNameSnafu { name }
232 );
233 }
234
235 Ok(Some(AddColumns {
236 add_columns: columns_to_add,
237 }))
238 }
239}
240#[cfg(test)]
241mod tests {
242 use std::sync::Arc;
243 use std::{assert_eq, vec};
244
245 use api::helper::ColumnDataTypeWrapper;
246 use api::v1::column::Values;
247 use api::v1::column_data_type_extension::TypeExt;
248 use api::v1::{
249 Column, ColumnDataType, ColumnDataTypeExtension, Decimal128, DecimalTypeExtension,
250 IntervalMonthDayNano, SemanticType,
251 };
252 use common_catalog::consts::MITO_ENGINE;
253 use common_time::interval::IntervalUnit;
254 use common_time::timestamp::TimeUnit;
255 use datatypes::data_type::ConcreteDataType;
256 use datatypes::schema::{ColumnSchema, SchemaBuilder};
257 use snafu::ResultExt;
258
259 use super::*;
260 use crate::error;
261 use crate::error::ColumnDataTypeSnafu;
262
263 #[inline]
264 fn build_column_schema(
265 column_name: &str,
266 datatype: i32,
267 nullable: bool,
268 ) -> error::Result<ColumnSchema> {
269 let datatype_wrapper =
270 ColumnDataTypeWrapper::try_new(datatype, None).context(ColumnDataTypeSnafu)?;
271
272 Ok(ColumnSchema::new(
273 column_name,
274 datatype_wrapper.into(),
275 nullable,
276 ))
277 }
278
279 fn build_create_expr_from_insertion(
280 catalog_name: &str,
281 schema_name: &str,
282 table_id: Option<TableId>,
283 table_name: &str,
284 columns: &[Column],
285 engine: &str,
286 ) -> Result<CreateTableExpr> {
287 let table_name = TableReference::full(catalog_name, schema_name, table_name);
288 let column_exprs = ColumnExpr::from_columns(columns);
289 build_create_table_expr(
290 table_id,
291 &table_name,
292 column_exprs,
293 engine,
294 "Created on insertion",
295 )
296 }
297
298 #[test]
299 fn test_build_create_table_request() {
300 let table_id = Some(10);
301 let table_name = "test_metric";
302
303 assert!(
304 build_create_expr_from_insertion("", "", table_id, table_name, &[], MITO_ENGINE)
305 .is_err()
306 );
307
308 let insert_batch = mock_insert_batch();
309
310 let create_expr = build_create_expr_from_insertion(
311 "",
312 "",
313 table_id,
314 table_name,
315 &insert_batch.0,
316 MITO_ENGINE,
317 )
318 .unwrap();
319
320 assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
321 assert_eq!(table_name, create_expr.table_name);
322 assert_eq!("Created on insertion".to_string(), create_expr.desc);
323 assert_eq!(
324 vec![create_expr.column_defs[0].name.clone()],
325 create_expr.primary_keys
326 );
327
328 let column_defs = create_expr.column_defs;
329 assert_eq!(column_defs[5].name, create_expr.time_index);
330 assert_eq!(7, column_defs.len());
331
332 assert_eq!(
333 ConcreteDataType::string_datatype(),
334 ConcreteDataType::from(
335 ColumnDataTypeWrapper::try_new(
336 column_defs
337 .iter()
338 .find(|c| c.name == "host")
339 .unwrap()
340 .data_type,
341 None
342 )
343 .unwrap()
344 )
345 );
346
347 assert_eq!(
348 ConcreteDataType::float64_datatype(),
349 ConcreteDataType::from(
350 ColumnDataTypeWrapper::try_new(
351 column_defs
352 .iter()
353 .find(|c| c.name == "cpu")
354 .unwrap()
355 .data_type,
356 None
357 )
358 .unwrap()
359 )
360 );
361
362 assert_eq!(
363 ConcreteDataType::float64_datatype(),
364 ConcreteDataType::from(
365 ColumnDataTypeWrapper::try_new(
366 column_defs
367 .iter()
368 .find(|c| c.name == "memory")
369 .unwrap()
370 .data_type,
371 None
372 )
373 .unwrap()
374 )
375 );
376
377 assert_eq!(
378 ConcreteDataType::time_datatype(TimeUnit::Millisecond),
379 ConcreteDataType::from(
380 ColumnDataTypeWrapper::try_new(
381 column_defs
382 .iter()
383 .find(|c| c.name == "time")
384 .unwrap()
385 .data_type,
386 None
387 )
388 .unwrap()
389 )
390 );
391
392 assert_eq!(
393 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
394 ConcreteDataType::from(
395 ColumnDataTypeWrapper::try_new(
396 column_defs
397 .iter()
398 .find(|c| c.name == "interval")
399 .unwrap()
400 .data_type,
401 None
402 )
403 .unwrap()
404 )
405 );
406
407 assert_eq!(
408 ConcreteDataType::timestamp_millisecond_datatype(),
409 ConcreteDataType::from(
410 ColumnDataTypeWrapper::try_new(
411 column_defs
412 .iter()
413 .find(|c| c.name == "ts")
414 .unwrap()
415 .data_type,
416 None
417 )
418 .unwrap()
419 )
420 );
421
422 let decimal_column = column_defs.iter().find(|c| c.name == "decimals").unwrap();
423 assert_eq!(
424 ConcreteDataType::decimal128_datatype(38, 10),
425 ConcreteDataType::from(
426 ColumnDataTypeWrapper::try_new(
427 decimal_column.data_type,
428 decimal_column.datatype_extension,
429 )
430 .unwrap()
431 )
432 );
433 }
434
435 #[test]
436 fn test_find_new_columns() {
437 let mut columns = Vec::with_capacity(1);
438 let cpu_column = build_column_schema("cpu", 10, true).unwrap();
439 let ts_column = build_column_schema("ts", 15, false)
440 .unwrap()
441 .with_time_index(true);
442 columns.push(cpu_column);
443 columns.push(ts_column);
444
445 let schema = Arc::new(SchemaBuilder::try_from(columns).unwrap().build().unwrap());
446
447 assert!(extract_new_columns(&schema, ColumnExpr::from_columns(&[]))
448 .unwrap()
449 .is_none());
450
451 let insert_batch = mock_insert_batch();
452
453 let add_columns = extract_new_columns(&schema, ColumnExpr::from_columns(&insert_batch.0))
454 .unwrap()
455 .unwrap();
456
457 assert_eq!(5, add_columns.add_columns.len());
458 let host_column = &add_columns.add_columns[0];
459 assert_eq!(
460 ConcreteDataType::string_datatype(),
461 ConcreteDataType::from(
462 ColumnDataTypeWrapper::try_new(
463 host_column.column_def.as_ref().unwrap().data_type,
464 None
465 )
466 .unwrap()
467 )
468 );
469 assert!(host_column.add_if_not_exists);
470
471 let memory_column = &add_columns.add_columns[1];
472 assert_eq!(
473 ConcreteDataType::float64_datatype(),
474 ConcreteDataType::from(
475 ColumnDataTypeWrapper::try_new(
476 memory_column.column_def.as_ref().unwrap().data_type,
477 None
478 )
479 .unwrap()
480 )
481 );
482 assert!(host_column.add_if_not_exists);
483
484 let time_column = &add_columns.add_columns[2];
485 assert_eq!(
486 ConcreteDataType::time_datatype(TimeUnit::Millisecond),
487 ConcreteDataType::from(
488 ColumnDataTypeWrapper::try_new(
489 time_column.column_def.as_ref().unwrap().data_type,
490 None
491 )
492 .unwrap()
493 )
494 );
495 assert!(host_column.add_if_not_exists);
496
497 let interval_column = &add_columns.add_columns[3];
498 assert_eq!(
499 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
500 ConcreteDataType::from(
501 ColumnDataTypeWrapper::try_new(
502 interval_column.column_def.as_ref().unwrap().data_type,
503 None
504 )
505 .unwrap()
506 )
507 );
508 assert!(host_column.add_if_not_exists);
509
510 let decimal_column = &add_columns.add_columns[4];
511 assert_eq!(
512 ConcreteDataType::decimal128_datatype(38, 10),
513 ConcreteDataType::from(
514 ColumnDataTypeWrapper::try_new(
515 decimal_column.column_def.as_ref().unwrap().data_type,
516 decimal_column
517 .column_def
518 .as_ref()
519 .unwrap()
520 .datatype_extension
521 )
522 .unwrap()
523 )
524 );
525 assert!(host_column.add_if_not_exists);
526 }
527
528 fn mock_insert_batch() -> (Vec<Column>, u32) {
529 let row_count = 2;
530
531 let host_vals = Values {
532 string_values: vec!["host1".to_string(), "host2".to_string()],
533 ..Default::default()
534 };
535 let host_column = Column {
536 column_name: "host".to_string(),
537 semantic_type: SemanticType::Tag as i32,
538 values: Some(host_vals),
539 null_mask: vec![0],
540 datatype: ColumnDataType::String as i32,
541 ..Default::default()
542 };
543
544 let cpu_vals = Values {
545 f64_values: vec![0.31],
546 ..Default::default()
547 };
548 let cpu_column = Column {
549 column_name: "cpu".to_string(),
550 semantic_type: SemanticType::Field as i32,
551 values: Some(cpu_vals),
552 null_mask: vec![2],
553 datatype: ColumnDataType::Float64 as i32,
554 ..Default::default()
555 };
556
557 let mem_vals = Values {
558 f64_values: vec![0.1],
559 ..Default::default()
560 };
561 let mem_column = Column {
562 column_name: "memory".to_string(),
563 semantic_type: SemanticType::Field as i32,
564 values: Some(mem_vals),
565 null_mask: vec![1],
566 datatype: ColumnDataType::Float64 as i32,
567 ..Default::default()
568 };
569
570 let time_vals = Values {
571 time_millisecond_values: vec![100, 101],
572 ..Default::default()
573 };
574 let time_column = Column {
575 column_name: "time".to_string(),
576 semantic_type: SemanticType::Field as i32,
577 values: Some(time_vals),
578 null_mask: vec![0],
579 datatype: ColumnDataType::TimeMillisecond as i32,
580 ..Default::default()
581 };
582
583 let interval1 = IntervalMonthDayNano {
584 months: 1,
585 days: 2,
586 nanoseconds: 3,
587 };
588 let interval2 = IntervalMonthDayNano {
589 months: 4,
590 days: 5,
591 nanoseconds: 6,
592 };
593 let interval_vals = Values {
594 interval_month_day_nano_values: vec![interval1, interval2],
595 ..Default::default()
596 };
597 let interval_column = Column {
598 column_name: "interval".to_string(),
599 semantic_type: SemanticType::Field as i32,
600 values: Some(interval_vals),
601 null_mask: vec![0],
602 datatype: ColumnDataType::IntervalMonthDayNano as i32,
603 ..Default::default()
604 };
605
606 let ts_vals = Values {
607 timestamp_millisecond_values: vec![100, 101],
608 ..Default::default()
609 };
610 let ts_column = Column {
611 column_name: "ts".to_string(),
612 semantic_type: SemanticType::Timestamp as i32,
613 values: Some(ts_vals),
614 null_mask: vec![0],
615 datatype: ColumnDataType::TimestampMillisecond as i32,
616 ..Default::default()
617 };
618 let decimal_vals = Values {
619 decimal128_values: vec![Decimal128 { hi: 0, lo: 123 }, Decimal128 { hi: 0, lo: 456 }],
620 ..Default::default()
621 };
622 let decimal_column = Column {
623 column_name: "decimals".to_string(),
624 semantic_type: SemanticType::Field as i32,
625 values: Some(decimal_vals),
626 null_mask: vec![0],
627 datatype: ColumnDataType::Decimal128 as i32,
628 datatype_extension: Some(ColumnDataTypeExtension {
629 type_ext: Some(TypeExt::DecimalType(DecimalTypeExtension {
630 precision: 38,
631 scale: 10,
632 })),
633 }),
634 options: None,
635 };
636
637 (
638 vec![
639 host_column,
640 cpu_column,
641 mem_column,
642 time_column,
643 interval_column,
644 ts_column,
645 decimal_column,
646 ],
647 row_count,
648 )
649 }
650}