1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39 UnsetIndexOptions,
40};
41
42pub type TableId = u32;
43pub type TableVersion = u64;
44
45#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
48pub enum FilterPushDownType {
49 Unsupported,
51 Inexact,
56 Exact,
60}
61
62impl From<TableProviderFilterPushDown> for FilterPushDownType {
63 fn from(value: TableProviderFilterPushDown) -> Self {
64 match value {
65 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
66 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
67 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
68 }
69 }
70}
71
72impl From<FilterPushDownType> for TableProviderFilterPushDown {
73 fn from(value: FilterPushDownType) -> Self {
74 match value {
75 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
76 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
77 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
78 }
79 }
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
84pub enum TableType {
85 Base,
87 View,
89 Temporary,
91}
92
93impl std::fmt::Display for TableType {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 TableType::Base => f.write_str("BASE TABLE"),
97 TableType::Temporary => f.write_str("TEMPORARY"),
98 TableType::View => f.write_str("VIEW"),
99 }
100 }
101}
102
103#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
105pub struct TableIdent {
106 pub table_id: TableId,
108 pub version: TableVersion,
111}
112
113#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
117#[builder(pattern = "mutable", custom_constructor)]
118pub struct TableMeta {
119 pub schema: SchemaRef,
120 pub primary_key_indices: Vec<usize>,
123 #[builder(default = "self.default_value_indices()?")]
124 pub value_indices: Vec<usize>,
125 #[builder(default, setter(into))]
126 pub engine: String,
127 #[builder(default, setter(into))]
128 pub region_numbers: Vec<u32>,
129 pub next_column_id: ColumnId,
130 #[builder(default)]
132 pub options: TableOptions,
133 #[builder(default = "Utc::now()")]
134 pub created_on: DateTime<Utc>,
135 #[builder(default = "Vec::new()")]
136 pub partition_key_indices: Vec<usize>,
137}
138
139impl TableMetaBuilder {
140 #[cfg(any(test, feature = "testing"))]
142 pub fn empty() -> Self {
143 Self {
144 schema: None,
145 primary_key_indices: None,
146 value_indices: None,
147 engine: None,
148 region_numbers: None,
149 next_column_id: None,
150 options: None,
151 created_on: None,
152 partition_key_indices: None,
153 }
154 }
155}
156
157impl TableMetaBuilder {
158 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
159 match (&self.primary_key_indices, &self.schema) {
160 (Some(v), Some(schema)) => {
161 let column_schemas = schema.column_schemas();
162 Ok((0..column_schemas.len())
163 .filter(|idx| !v.contains(idx))
164 .collect())
165 }
166 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
167 }
168 }
169
170 pub fn new_external_table() -> Self {
171 Self {
172 schema: None,
173 primary_key_indices: Some(Vec::new()),
174 value_indices: Some(Vec::new()),
175 engine: None,
176 region_numbers: Some(Vec::new()),
177 next_column_id: Some(0),
178 options: None,
179 created_on: None,
180 partition_key_indices: None,
181 }
182 }
183}
184
185struct SplitResult<'a> {
187 columns_at_first: Vec<&'a AddColumnRequest>,
189 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
191 columns_at_last: Vec<&'a AddColumnRequest>,
193 column_names: Vec<String>,
195}
196
197impl TableMeta {
198 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
199 let columns_schemas = &self.schema.column_schemas();
200 self.primary_key_indices
201 .iter()
202 .map(|idx| &columns_schemas[*idx].name)
203 }
204
205 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
206 let columns_schemas = self.schema.column_schemas();
208 let primary_key_indices = &self.primary_key_indices;
209 columns_schemas
210 .iter()
211 .enumerate()
212 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
213 .map(|(_, cs)| &cs.name)
214 }
215
216 pub fn builder_with_alter_kind(
220 &self,
221 table_name: &str,
222 alter_kind: &AlterKind,
223 ) -> Result<TableMetaBuilder> {
224 match alter_kind {
225 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
226 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
227 AlterKind::ModifyColumnTypes { columns } => {
228 self.modify_column_types(table_name, columns)
229 }
230 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
232 AlterKind::SetTableOptions { options } => self.set_table_options(options),
233 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
234 AlterKind::SetIndex { options } => match options {
235 SetIndexOptions::Fulltext {
236 column_name,
237 options,
238 } => self.change_column_fulltext_options(
239 table_name,
240 column_name,
241 true,
242 Some(options),
243 ),
244 SetIndexOptions::Inverted { column_name } => {
245 self.change_column_modify_inverted_index(table_name, column_name, true)
246 }
247 SetIndexOptions::Skipping {
248 column_name,
249 options,
250 } => self.change_column_skipping_index_options(
251 table_name,
252 column_name,
253 Some(options),
254 ),
255 },
256 AlterKind::UnsetIndex { options } => match options {
257 UnsetIndexOptions::Fulltext { column_name } => {
258 self.change_column_fulltext_options(table_name, column_name, false, None)
259 }
260 UnsetIndexOptions::Inverted { column_name } => {
261 self.change_column_modify_inverted_index(table_name, column_name, false)
262 }
263 UnsetIndexOptions::Skipping { column_name } => {
264 self.change_column_skipping_index_options(table_name, column_name, None)
265 }
266 },
267 }
268 }
269
270 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
272 let mut new_options = self.options.clone();
273
274 for request in requests {
275 match request {
276 SetRegionOption::Ttl(new_ttl) => {
277 new_options.ttl = *new_ttl;
278 }
279 SetRegionOption::Twsc(key, value) => {
280 if !value.is_empty() {
281 new_options
282 .extra_options
283 .insert(key.to_string(), value.to_string());
284 new_options.extra_options.insert(
286 COMPACTION_TYPE.to_string(),
287 COMPACTION_TYPE_TWCS.to_string(),
288 );
289 } else {
290 new_options.extra_options.remove(key.as_str());
292 }
293 }
294 }
295 }
296 let mut builder = self.new_meta_builder();
297 builder.options(new_options);
298
299 Ok(builder)
300 }
301
302 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
303 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
304 self.set_table_options(&requests)
305 }
306
307 fn change_column_modify_inverted_index(
309 &self,
310 table_name: &str,
311 column_name: &str,
312 value: bool,
313 ) -> Result<TableMetaBuilder> {
314 let table_schema = &self.schema;
315 let mut meta_builder = self.new_meta_builder();
316
317 let mut columns: Vec<ColumnSchema> =
318 Vec::with_capacity(table_schema.column_schemas().len());
319
320 for column_schema in table_schema.column_schemas().iter() {
321 if column_schema.name == column_name {
322 let mut new_column_schema = column_schema.clone();
323 new_column_schema.set_inverted_index(value);
324 columns.push(new_column_schema);
325 } else {
326 columns.push(column_schema.clone());
327 }
328 }
329
330 let mut builder = SchemaBuilder::try_from_columns(columns)
332 .with_context(|_| error::SchemaBuildSnafu {
333 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
334 })?
335 .version(table_schema.version() + 1);
336
337 for (k, v) in table_schema.metadata().iter() {
338 builder = builder.add_metadata(k, v);
339 }
340
341 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
342 msg: format!(
343 "Table {table_name} cannot change fulltext options for column {column_name}",
344 ),
345 })?;
346
347 let _ = meta_builder
348 .schema(Arc::new(new_schema))
349 .primary_key_indices(self.primary_key_indices.clone());
350
351 Ok(meta_builder)
352 }
353
354 fn change_column_fulltext_options(
356 &self,
357 table_name: &str,
358 column_name: &str,
359 enable: bool,
360 options: Option<&FulltextOptions>,
361 ) -> Result<TableMetaBuilder> {
362 let table_schema = &self.schema;
363 let mut meta_builder = self.new_meta_builder();
364
365 let column = &table_schema
366 .column_schema_by_name(column_name)
367 .with_context(|| error::ColumnNotExistsSnafu {
368 column_name,
369 table_name,
370 })?;
371
372 ensure!(
373 column.data_type.is_string(),
374 error::InvalidColumnOptionSnafu {
375 column_name,
376 msg: "FULLTEXT index only supports string type",
377 }
378 );
379
380 let current_fulltext_options = column
381 .fulltext_options()
382 .context(error::SetFulltextOptionsSnafu { column_name })?;
383
384 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
385 for column_schema in table_schema.column_schemas() {
386 if column_schema.name == column_name {
387 let mut new_column_schema = column_schema.clone();
388 if enable {
389 ensure!(
390 options.is_some(),
391 error::InvalidColumnOptionSnafu {
392 column_name,
393 msg: "FULLTEXT index options must be provided",
394 }
395 );
396 set_column_fulltext_options(
397 &mut new_column_schema,
398 column_name,
399 options.unwrap(),
400 current_fulltext_options.clone(),
401 )?
402 } else {
403 unset_column_fulltext_options(
404 &mut new_column_schema,
405 column_name,
406 current_fulltext_options.clone(),
407 )?
408 }
409 columns.push(new_column_schema);
410 } else {
411 columns.push(column_schema.clone());
412 }
413 }
414
415 let mut builder = SchemaBuilder::try_from_columns(columns)
417 .with_context(|_| error::SchemaBuildSnafu {
418 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
419 })?
420 .version(table_schema.version() + 1);
421
422 for (k, v) in table_schema.metadata().iter() {
423 builder = builder.add_metadata(k, v);
424 }
425
426 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
427 msg: format!(
428 "Table {table_name} cannot change fulltext options for column {column_name}",
429 ),
430 })?;
431
432 let _ = meta_builder
433 .schema(Arc::new(new_schema))
434 .primary_key_indices(self.primary_key_indices.clone());
435
436 Ok(meta_builder)
437 }
438
439 fn change_column_skipping_index_options(
441 &self,
442 table_name: &str,
443 column_name: &str,
444 options: Option<&SkippingIndexOptions>,
445 ) -> Result<TableMetaBuilder> {
446 let table_schema = &self.schema;
447 let mut meta_builder = self.new_meta_builder();
448
449 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
450 for column_schema in table_schema.column_schemas() {
451 if column_schema.name == column_name {
452 let mut new_column_schema = column_schema.clone();
453 if let Some(options) = options {
454 set_column_skipping_index_options(
455 &mut new_column_schema,
456 column_name,
457 options,
458 )?;
459 } else {
460 unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
461 }
462 columns.push(new_column_schema);
463 } else {
464 columns.push(column_schema.clone());
465 }
466 }
467
468 let mut builder = SchemaBuilder::try_from_columns(columns)
469 .with_context(|_| error::SchemaBuildSnafu {
470 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
471 })?
472 .version(table_schema.version() + 1);
473
474 for (k, v) in table_schema.metadata().iter() {
475 builder = builder.add_metadata(k, v);
476 }
477
478 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
479 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
480 })?;
481
482 let _ = meta_builder
483 .schema(Arc::new(new_schema))
484 .primary_key_indices(self.primary_key_indices.clone());
485
486 Ok(meta_builder)
487 }
488
489 pub fn alloc_new_column(
494 &mut self,
495 table_name: &str,
496 new_column: &ColumnSchema,
497 ) -> Result<ColumnDescriptor> {
498 let desc = ColumnDescriptorBuilder::new(
499 self.next_column_id as ColumnId,
500 &new_column.name,
501 new_column.data_type.clone(),
502 )
503 .is_nullable(new_column.is_nullable())
504 .default_constraint(new_column.default_constraint().cloned())
505 .build()
506 .context(error::BuildColumnDescriptorSnafu {
507 table_name,
508 column_name: &new_column.name,
509 })?;
510
511 self.next_column_id += 1;
513
514 Ok(desc)
515 }
516
517 fn new_meta_builder(&self) -> TableMetaBuilder {
519 let mut builder = TableMetaBuilder::from(self);
520 builder.value_indices = None;
522 builder
523 }
524
525 fn add_columns(
527 &self,
528 table_name: &str,
529 requests: &[AddColumnRequest],
530 ) -> Result<TableMetaBuilder> {
531 let table_schema = &self.schema;
532 let mut meta_builder = self.new_meta_builder();
533 let original_primary_key_indices: HashSet<&usize> =
534 self.primary_key_indices.iter().collect();
535
536 let mut names = HashSet::with_capacity(requests.len());
537 let mut new_columns = Vec::with_capacity(requests.len());
538 for col_to_add in requests {
539 if let Some(column_schema) =
540 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
541 {
542 ensure!(
544 col_to_add.add_if_not_exists,
545 error::ColumnExistsSnafu {
546 table_name,
547 column_name: &col_to_add.column_schema.name
548 },
549 );
550
551 ensure!(
553 column_schema.data_type == col_to_add.column_schema.data_type,
554 error::InvalidAlterRequestSnafu {
555 table: table_name,
556 err: format!(
557 "column {} already exists with different type {:?}",
558 col_to_add.column_schema.name, column_schema.data_type,
559 ),
560 }
561 );
562 } else {
563 ensure!(
566 names.insert(&col_to_add.column_schema.name),
567 error::InvalidAlterRequestSnafu {
568 table: table_name,
569 err: format!(
570 "add column {} more than once",
571 col_to_add.column_schema.name
572 ),
573 }
574 );
575
576 ensure!(
577 col_to_add.column_schema.is_nullable()
578 || col_to_add.column_schema.default_constraint().is_some(),
579 error::InvalidAlterRequestSnafu {
580 table: table_name,
581 err: format!(
582 "no default value for column {}",
583 col_to_add.column_schema.name
584 ),
585 },
586 );
587
588 new_columns.push(col_to_add.clone());
589 }
590 }
591 let requests = &new_columns[..];
592
593 let SplitResult {
594 columns_at_first,
595 columns_at_after,
596 columns_at_last,
597 column_names,
598 } = self.split_requests_by_column_location(table_name, requests)?;
599 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
600 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
601 columns_at_first.iter().rev().for_each(|request| {
603 if request.is_key {
604 primary_key_indices.push(columns.len());
606 }
607 columns.push(request.column_schema.clone());
608 });
609 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
611 if original_primary_key_indices.contains(&index) {
612 primary_key_indices.push(columns.len());
613 }
614 columns.push(column_schema.clone());
615 if let Some(requests) = columns_at_after.get(&column_schema.name) {
616 requests.iter().rev().for_each(|request| {
617 if request.is_key {
618 primary_key_indices.push(columns.len());
620 }
621 columns.push(request.column_schema.clone());
622 });
623 }
624 }
625 columns_at_last.iter().for_each(|request| {
627 if request.is_key {
628 primary_key_indices.push(columns.len());
630 }
631 columns.push(request.column_schema.clone());
632 });
633
634 let mut builder = SchemaBuilder::try_from(columns)
635 .with_context(|_| error::SchemaBuildSnafu {
636 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
637 })?
638 .version(table_schema.version() + 1);
640 for (k, v) in table_schema.metadata().iter() {
641 builder = builder.add_metadata(k, v);
642 }
643 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
644 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
645 })?;
646
647 let _ = meta_builder
649 .schema(Arc::new(new_schema))
650 .primary_key_indices(primary_key_indices);
651
652 Ok(meta_builder)
653 }
654
655 fn remove_columns(
656 &self,
657 table_name: &str,
658 column_names: &[String],
659 ) -> Result<TableMetaBuilder> {
660 let table_schema = &self.schema;
661 let column_names: HashSet<_> = column_names.iter().collect();
662 let mut meta_builder = self.new_meta_builder();
663
664 let timestamp_index = table_schema.timestamp_index();
665 for column_name in &column_names {
667 if let Some(index) = table_schema.column_index_by_name(column_name) {
668 ensure!(
671 !self.primary_key_indices.contains(&index),
672 error::RemoveColumnInIndexSnafu {
673 column_name: *column_name,
674 table_name,
675 }
676 );
677
678 if let Some(ts_index) = timestamp_index {
679 ensure!(
681 index != ts_index,
682 error::RemoveColumnInIndexSnafu {
683 column_name: table_schema.column_name_by_index(ts_index),
684 table_name,
685 }
686 );
687 }
688 } else {
689 return error::ColumnNotExistsSnafu {
690 column_name: *column_name,
691 table_name,
692 }
693 .fail()?;
694 }
695 }
696
697 let columns: Vec<_> = table_schema
699 .column_schemas()
700 .iter()
701 .filter(|column_schema| !column_names.contains(&column_schema.name))
702 .cloned()
703 .collect();
704
705 let mut builder = SchemaBuilder::try_from_columns(columns)
706 .with_context(|_| error::SchemaBuildSnafu {
707 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
708 })?
709 .version(table_schema.version() + 1);
711 for (k, v) in table_schema.metadata().iter() {
712 builder = builder.add_metadata(k, v);
713 }
714 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
715 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
716 })?;
717
718 let primary_key_indices = self
720 .primary_key_indices
721 .iter()
722 .map(|idx| table_schema.column_name_by_index(*idx))
723 .map(|name| new_schema.column_index_by_name(name).unwrap())
725 .collect();
726
727 let _ = meta_builder
728 .schema(Arc::new(new_schema))
729 .primary_key_indices(primary_key_indices);
730
731 Ok(meta_builder)
732 }
733
734 fn modify_column_types(
735 &self,
736 table_name: &str,
737 requests: &[ModifyColumnTypeRequest],
738 ) -> Result<TableMetaBuilder> {
739 let table_schema = &self.schema;
740 let mut meta_builder = self.new_meta_builder();
741
742 let mut modify_column_types = HashMap::with_capacity(requests.len());
743 let timestamp_index = table_schema.timestamp_index();
744
745 for col_to_change in requests {
746 let change_column_name = &col_to_change.column_name;
747
748 let index = table_schema
749 .column_index_by_name(change_column_name)
750 .with_context(|| error::ColumnNotExistsSnafu {
751 column_name: change_column_name,
752 table_name,
753 })?;
754
755 let column = &table_schema.column_schemas()[index];
756
757 ensure!(
758 !self.primary_key_indices.contains(&index),
759 error::InvalidAlterRequestSnafu {
760 table: table_name,
761 err: format!(
762 "Not allowed to change primary key index column '{}'",
763 column.name
764 )
765 }
766 );
767
768 if let Some(ts_index) = timestamp_index {
769 ensure!(
771 index != ts_index,
772 error::InvalidAlterRequestSnafu {
773 table: table_name,
774 err: format!(
775 "Not allowed to change timestamp index column '{}' datatype",
776 column.name
777 )
778 }
779 );
780 }
781
782 ensure!(
783 modify_column_types
784 .insert(&col_to_change.column_name, col_to_change)
785 .is_none(),
786 error::InvalidAlterRequestSnafu {
787 table: table_name,
788 err: format!(
789 "change column datatype {} more than once",
790 col_to_change.column_name
791 ),
792 }
793 );
794
795 ensure!(
796 column
797 .data_type
798 .can_arrow_type_cast_to(&col_to_change.target_type),
799 error::InvalidAlterRequestSnafu {
800 table: table_name,
801 err: format!(
802 "column '{}' cannot be cast automatically to type '{}'",
803 col_to_change.column_name, col_to_change.target_type,
804 ),
805 }
806 );
807
808 ensure!(
809 column.is_nullable(),
810 error::InvalidAlterRequestSnafu {
811 table: table_name,
812 err: format!(
813 "column '{}' must be nullable to ensure safe conversion.",
814 col_to_change.column_name,
815 ),
816 }
817 );
818 }
819 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
822 for mut column in table_schema.column_schemas().iter().cloned() {
823 if let Some(change_column) = modify_column_types.get(&column.name) {
824 column.data_type = change_column.target_type.clone();
825 let new_default = if let Some(default_value) = column.default_constraint() {
826 Some(
827 default_value
828 .cast_to_datatype(&change_column.target_type)
829 .with_context(|_| error::CastDefaultValueSnafu {
830 reason: format!(
831 "Failed to cast default value from {:?} to type {:?}",
832 default_value, &change_column.target_type
833 ),
834 })?,
835 )
836 } else {
837 None
838 };
839 column = column
840 .clone()
841 .with_default_constraint(new_default.clone())
842 .with_context(|_| error::CastDefaultValueSnafu {
843 reason: format!("Failed to set new default: {:?}", new_default),
844 })?;
845 }
846 columns.push(column)
847 }
848
849 let mut builder = SchemaBuilder::try_from_columns(columns)
850 .with_context(|_| error::SchemaBuildSnafu {
851 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
852 })?
853 .version(table_schema.version() + 1);
855 for (k, v) in table_schema.metadata().iter() {
856 builder = builder.add_metadata(k, v);
857 }
858 let new_schema = builder.build().with_context(|_| {
859 let column_names: Vec<_> = requests
860 .iter()
861 .map(|request| &request.column_name)
862 .collect();
863
864 error::SchemaBuildSnafu {
865 msg: format!(
866 "Table {table_name} cannot change datatype with columns {column_names:?}"
867 ),
868 }
869 })?;
870
871 let _ = meta_builder
872 .schema(Arc::new(new_schema))
873 .primary_key_indices(self.primary_key_indices.clone());
874
875 Ok(meta_builder)
876 }
877
878 fn split_requests_by_column_location<'a>(
880 &self,
881 table_name: &str,
882 requests: &'a [AddColumnRequest],
883 ) -> Result<SplitResult<'a>> {
884 let table_schema = &self.schema;
885 let mut columns_at_first = Vec::new();
886 let mut columns_at_after = HashMap::new();
887 let mut columns_at_last = Vec::new();
888 let mut column_names = Vec::with_capacity(requests.len());
889 for request in requests {
890 let column_name = &request.column_schema.name;
892 column_names.push(column_name.clone());
893 ensure!(
894 table_schema.column_schema_by_name(column_name).is_none(),
895 error::ColumnExistsSnafu {
896 column_name,
897 table_name,
898 }
899 );
900 match request.location.as_ref() {
901 Some(AddColumnLocation::First) => {
902 columns_at_first.push(request);
903 }
904 Some(AddColumnLocation::After { column_name }) => {
905 ensure!(
906 table_schema.column_schema_by_name(column_name).is_some(),
907 error::ColumnNotExistsSnafu {
908 column_name,
909 table_name,
910 }
911 );
912 columns_at_after
913 .entry(column_name.clone())
914 .or_insert(Vec::new())
915 .push(request);
916 }
917 None => {
918 columns_at_last.push(request);
919 }
920 }
921 }
922 Ok(SplitResult {
923 columns_at_first,
924 columns_at_after,
925 columns_at_last,
926 column_names,
927 })
928 }
929}
930
931#[derive(Clone, Debug, PartialEq, Eq, Builder)]
932#[builder(pattern = "owned")]
933pub struct TableInfo {
934 #[builder(default, setter(into))]
936 pub ident: TableIdent,
937 #[builder(setter(into))]
939 pub name: String,
940 #[builder(default, setter(into))]
942 pub desc: Option<String>,
943 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
944 pub catalog_name: String,
945 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
946 pub schema_name: String,
947 pub meta: TableMeta,
948 #[builder(default = "TableType::Base")]
949 pub table_type: TableType,
950}
951
952pub type TableInfoRef = Arc<TableInfo>;
953
954impl TableInfo {
955 pub fn table_id(&self) -> TableId {
956 self.ident.table_id
957 }
958
959 pub fn region_ids(&self) -> Vec<RegionId> {
960 self.meta
961 .region_numbers
962 .iter()
963 .map(|id| RegionId::new(self.table_id(), *id))
964 .collect()
965 }
966 pub fn full_table_name(&self) -> String {
968 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
969 }
970
971 pub fn is_physical_table(&self) -> bool {
973 self.meta
974 .options
975 .extra_options
976 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
977 }
978
979 pub fn is_ttl_instant_table(&self) -> bool {
981 self.meta
982 .options
983 .ttl
984 .map(|t| t.is_instant())
985 .unwrap_or(false)
986 }
987}
988
989impl TableInfoBuilder {
990 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
991 Self {
992 name: Some(name.into()),
993 meta: Some(meta),
994 ..Default::default()
995 }
996 }
997
998 pub fn table_id(mut self, id: TableId) -> Self {
999 let ident = self.ident.get_or_insert_with(TableIdent::default);
1000 ident.table_id = id;
1001 self
1002 }
1003
1004 pub fn table_version(mut self, version: TableVersion) -> Self {
1005 let ident = self.ident.get_or_insert_with(TableIdent::default);
1006 ident.version = version;
1007 self
1008 }
1009}
1010
1011impl TableIdent {
1012 pub fn new(table_id: TableId) -> Self {
1013 Self {
1014 table_id,
1015 version: 0,
1016 }
1017 }
1018}
1019
1020impl From<TableId> for TableIdent {
1021 fn from(table_id: TableId) -> Self {
1022 Self::new(table_id)
1023 }
1024}
1025
1026#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1028pub struct RawTableMeta {
1029 pub schema: RawSchema,
1030 pub primary_key_indices: Vec<usize>,
1033 pub value_indices: Vec<usize>,
1035 pub engine: String,
1037 pub next_column_id: ColumnId,
1040 pub region_numbers: Vec<u32>,
1041 pub options: TableOptions,
1042 pub created_on: DateTime<Utc>,
1043 #[serde(default)]
1045 pub partition_key_indices: Vec<usize>,
1046}
1047
1048impl From<TableMeta> for RawTableMeta {
1049 fn from(meta: TableMeta) -> RawTableMeta {
1050 RawTableMeta {
1051 schema: RawSchema::from(&*meta.schema),
1052 primary_key_indices: meta.primary_key_indices,
1053 value_indices: meta.value_indices,
1054 engine: meta.engine,
1055 next_column_id: meta.next_column_id,
1056 region_numbers: meta.region_numbers,
1057 options: meta.options,
1058 created_on: meta.created_on,
1059 partition_key_indices: meta.partition_key_indices,
1060 }
1061 }
1062}
1063
1064impl TryFrom<RawTableMeta> for TableMeta {
1065 type Error = ConvertError;
1066
1067 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1068 Ok(TableMeta {
1069 schema: Arc::new(Schema::try_from(raw.schema)?),
1070 primary_key_indices: raw.primary_key_indices,
1071 value_indices: raw.value_indices,
1072 engine: raw.engine,
1073 region_numbers: raw.region_numbers,
1074 next_column_id: raw.next_column_id,
1075 options: raw.options,
1076 created_on: raw.created_on,
1077 partition_key_indices: raw.partition_key_indices,
1078 })
1079 }
1080}
1081
1082#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1084pub struct RawTableInfo {
1085 pub ident: TableIdent,
1086 pub name: String,
1087 pub desc: Option<String>,
1088 pub catalog_name: String,
1089 pub schema_name: String,
1090 pub meta: RawTableMeta,
1091 pub table_type: TableType,
1092}
1093
1094impl RawTableInfo {
1095 pub fn sort_columns(&mut self) {
1097 let column_schemas = &self.meta.schema.column_schemas;
1098 let primary_keys = self
1099 .meta
1100 .primary_key_indices
1101 .iter()
1102 .map(|index| column_schemas[*index].name.clone())
1103 .collect::<HashSet<_>>();
1104
1105 self.meta
1106 .schema
1107 .column_schemas
1108 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1109
1110 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1112 let mut timestamp_index = None;
1113 let mut value_indices =
1114 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
1115 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1116 if primary_keys.contains(&column_schema.name) {
1117 primary_key_indices.push(index);
1118 } else if column_schema.is_time_index() {
1119 timestamp_index = Some(index);
1120 } else {
1121 value_indices.push(index);
1122 }
1123 }
1124
1125 self.meta.schema.timestamp_index = timestamp_index;
1127 self.meta.primary_key_indices = primary_key_indices;
1128 self.meta.value_indices = value_indices;
1129 }
1130
1131 pub fn to_region_options(&self) -> HashMap<String, String> {
1135 HashMap::from(&self.meta.options)
1136 }
1137}
1138
1139impl From<TableInfo> for RawTableInfo {
1140 fn from(info: TableInfo) -> RawTableInfo {
1141 RawTableInfo {
1142 ident: info.ident,
1143 name: info.name,
1144 desc: info.desc,
1145 catalog_name: info.catalog_name,
1146 schema_name: info.schema_name,
1147 meta: RawTableMeta::from(info.meta),
1148 table_type: info.table_type,
1149 }
1150 }
1151}
1152
1153impl TryFrom<RawTableInfo> for TableInfo {
1154 type Error = ConvertError;
1155
1156 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1157 Ok(TableInfo {
1158 ident: raw.ident,
1159 name: raw.name,
1160 desc: raw.desc,
1161 catalog_name: raw.catalog_name,
1162 schema_name: raw.schema_name,
1163 meta: TableMeta::try_from(raw.meta)?,
1164 table_type: raw.table_type,
1165 })
1166 }
1167}
1168
1169fn set_column_fulltext_options(
1178 column_schema: &mut ColumnSchema,
1179 column_name: &str,
1180 options: &FulltextOptions,
1181 current_options: Option<FulltextOptions>,
1182) -> Result<()> {
1183 if let Some(current_options) = current_options {
1184 ensure!(
1185 current_options.analyzer == options.analyzer
1186 && current_options.case_sensitive == options.case_sensitive,
1187 error::InvalidColumnOptionSnafu {
1188 column_name,
1189 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1190 current_options.analyzer, current_options.case_sensitive),
1191 }
1192 );
1193 }
1194
1195 column_schema
1196 .set_fulltext_options(options)
1197 .context(error::SetFulltextOptionsSnafu { column_name })?;
1198
1199 Ok(())
1200}
1201
1202fn unset_column_fulltext_options(
1203 column_schema: &mut ColumnSchema,
1204 column_name: &str,
1205 current_options: Option<FulltextOptions>,
1206) -> Result<()> {
1207 ensure!(
1208 current_options
1209 .as_ref()
1210 .is_some_and(|options| options.enable),
1211 error::InvalidColumnOptionSnafu {
1212 column_name,
1213 msg: "FULLTEXT index already disabled".to_string(),
1214 }
1215 );
1216
1217 let mut options = current_options.unwrap();
1218 options.enable = false;
1219 column_schema
1220 .set_fulltext_options(&options)
1221 .context(error::SetFulltextOptionsSnafu { column_name })?;
1222
1223 Ok(())
1224}
1225
1226fn set_column_skipping_index_options(
1227 column_schema: &mut ColumnSchema,
1228 column_name: &str,
1229 options: &SkippingIndexOptions,
1230) -> Result<()> {
1231 column_schema
1232 .set_skipping_options(options)
1233 .context(error::SetSkippingOptionsSnafu { column_name })?;
1234
1235 Ok(())
1236}
1237
1238fn unset_column_skipping_index_options(
1239 column_schema: &mut ColumnSchema,
1240 column_name: &str,
1241) -> Result<()> {
1242 column_schema
1243 .unset_skipping_options()
1244 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1245 Ok(())
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use common_error::ext::ErrorExt;
1251 use common_error::status_code::StatusCode;
1252 use datatypes::data_type::ConcreteDataType;
1253 use datatypes::schema::{
1254 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1255 };
1256
1257 use super::*;
1258
1259 fn new_test_schema() -> Schema {
1261 let column_schemas = vec![
1262 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1263 ColumnSchema::new(
1264 "ts",
1265 ConcreteDataType::timestamp_millisecond_datatype(),
1266 false,
1267 )
1268 .with_time_index(true),
1269 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1270 ];
1271 SchemaBuilder::try_from(column_schemas)
1272 .unwrap()
1273 .version(123)
1274 .build()
1275 .unwrap()
1276 }
1277
1278 #[test]
1279 fn test_raw_convert() {
1280 let schema = Arc::new(new_test_schema());
1281 let meta = TableMetaBuilder::empty()
1282 .schema(schema)
1283 .primary_key_indices(vec![0])
1284 .engine("engine")
1285 .next_column_id(3)
1286 .build()
1287 .unwrap();
1288 let info = TableInfoBuilder::default()
1289 .table_id(10)
1290 .table_version(5)
1291 .name("mytable")
1292 .meta(meta)
1293 .build()
1294 .unwrap();
1295
1296 let raw = RawTableInfo::from(info.clone());
1297 let info_new = TableInfo::try_from(raw).unwrap();
1298
1299 assert_eq!(info, info_new);
1300 }
1301
1302 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1303 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1304 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1305 let alter_kind = AlterKind::AddColumns {
1306 columns: vec![
1307 AddColumnRequest {
1308 column_schema: new_tag,
1309 is_key: true,
1310 location: None,
1311 add_if_not_exists: false,
1312 },
1313 AddColumnRequest {
1314 column_schema: new_field,
1315 is_key: false,
1316 location: None,
1317 add_if_not_exists: false,
1318 },
1319 ],
1320 };
1321
1322 let builder = meta
1323 .builder_with_alter_kind("my_table", &alter_kind)
1324 .unwrap();
1325 builder.build().unwrap()
1326 }
1327
1328 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1329 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1330 let new_field = ColumnSchema::new(
1331 "my_field_after_ts",
1332 ConcreteDataType::string_datatype(),
1333 true,
1334 );
1335 let alter_kind = AlterKind::AddColumns {
1336 columns: vec![
1337 AddColumnRequest {
1338 column_schema: new_tag,
1339 is_key: true,
1340 location: Some(AddColumnLocation::First),
1341 add_if_not_exists: false,
1342 },
1343 AddColumnRequest {
1344 column_schema: new_field,
1345 is_key: false,
1346 location: Some(AddColumnLocation::After {
1347 column_name: "ts".to_string(),
1348 }),
1349 add_if_not_exists: false,
1350 },
1351 ],
1352 };
1353
1354 let builder = meta
1355 .builder_with_alter_kind("my_table", &alter_kind)
1356 .unwrap();
1357 builder.build().unwrap()
1358 }
1359
1360 #[test]
1361 fn test_add_columns() {
1362 let schema = Arc::new(new_test_schema());
1363 let meta = TableMetaBuilder::empty()
1364 .schema(schema)
1365 .primary_key_indices(vec![0])
1366 .engine("engine")
1367 .next_column_id(3)
1368 .build()
1369 .unwrap();
1370
1371 let new_meta = add_columns_to_meta(&meta);
1372 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1373
1374 let names: Vec<String> = new_meta
1375 .schema
1376 .column_schemas()
1377 .iter()
1378 .map(|column_schema| column_schema.name.clone())
1379 .collect();
1380 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1381 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1382 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1383 }
1384
1385 #[test]
1386 fn test_add_columns_multiple_times() {
1387 let schema = Arc::new(new_test_schema());
1388 let meta = TableMetaBuilder::empty()
1389 .schema(schema)
1390 .primary_key_indices(vec![0])
1391 .engine("engine")
1392 .next_column_id(3)
1393 .build()
1394 .unwrap();
1395
1396 let alter_kind = AlterKind::AddColumns {
1397 columns: vec![
1398 AddColumnRequest {
1399 column_schema: ColumnSchema::new(
1400 "col3",
1401 ConcreteDataType::int32_datatype(),
1402 true,
1403 ),
1404 is_key: true,
1405 location: None,
1406 add_if_not_exists: true,
1407 },
1408 AddColumnRequest {
1409 column_schema: ColumnSchema::new(
1410 "col3",
1411 ConcreteDataType::int32_datatype(),
1412 true,
1413 ),
1414 is_key: true,
1415 location: None,
1416 add_if_not_exists: true,
1417 },
1418 ],
1419 };
1420 let err = meta
1421 .builder_with_alter_kind("my_table", &alter_kind)
1422 .err()
1423 .unwrap();
1424 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1425 }
1426
1427 #[test]
1428 fn test_remove_columns() {
1429 let schema = Arc::new(new_test_schema());
1430 let meta = TableMetaBuilder::empty()
1431 .schema(schema.clone())
1432 .primary_key_indices(vec![0])
1433 .engine("engine")
1434 .next_column_id(3)
1435 .build()
1436 .unwrap();
1437 let meta = add_columns_to_meta(&meta);
1439
1440 let alter_kind = AlterKind::DropColumns {
1441 names: vec![String::from("col2"), String::from("my_field")],
1442 };
1443 let new_meta = meta
1444 .builder_with_alter_kind("my_table", &alter_kind)
1445 .unwrap()
1446 .build()
1447 .unwrap();
1448
1449 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1450
1451 let names: Vec<String> = new_meta
1452 .schema
1453 .column_schemas()
1454 .iter()
1455 .map(|column_schema| column_schema.name.clone())
1456 .collect();
1457 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1458 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1459 assert_eq!(&[1], &new_meta.value_indices[..]);
1460 assert_eq!(
1461 schema.timestamp_column(),
1462 new_meta.schema.timestamp_column()
1463 );
1464 }
1465
1466 #[test]
1467 fn test_remove_multiple_columns_before_timestamp() {
1468 let column_schemas = vec![
1469 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1470 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1471 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1472 ColumnSchema::new(
1473 "ts",
1474 ConcreteDataType::timestamp_millisecond_datatype(),
1475 false,
1476 )
1477 .with_time_index(true),
1478 ];
1479 let schema = Arc::new(
1480 SchemaBuilder::try_from(column_schemas)
1481 .unwrap()
1482 .version(123)
1483 .build()
1484 .unwrap(),
1485 );
1486 let meta = TableMetaBuilder::empty()
1487 .schema(schema.clone())
1488 .primary_key_indices(vec![1])
1489 .engine("engine")
1490 .next_column_id(4)
1491 .build()
1492 .unwrap();
1493
1494 let alter_kind = AlterKind::DropColumns {
1496 names: vec![String::from("col3"), String::from("col1")],
1497 };
1498 let new_meta = meta
1499 .builder_with_alter_kind("my_table", &alter_kind)
1500 .unwrap()
1501 .build()
1502 .unwrap();
1503
1504 let names: Vec<String> = new_meta
1505 .schema
1506 .column_schemas()
1507 .iter()
1508 .map(|column_schema| column_schema.name.clone())
1509 .collect();
1510 assert_eq!(&["col2", "ts"], &names[..]);
1511 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1512 assert_eq!(&[1], &new_meta.value_indices[..]);
1513 assert_eq!(
1514 schema.timestamp_column(),
1515 new_meta.schema.timestamp_column()
1516 );
1517 }
1518
1519 #[test]
1520 fn test_add_existing_column() {
1521 let schema = Arc::new(new_test_schema());
1522 let meta = TableMetaBuilder::empty()
1523 .schema(schema)
1524 .primary_key_indices(vec![0])
1525 .engine("engine")
1526 .next_column_id(3)
1527 .build()
1528 .unwrap();
1529
1530 let alter_kind = AlterKind::AddColumns {
1531 columns: vec![AddColumnRequest {
1532 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1533 is_key: false,
1534 location: None,
1535 add_if_not_exists: false,
1536 }],
1537 };
1538
1539 let err = meta
1540 .builder_with_alter_kind("my_table", &alter_kind)
1541 .err()
1542 .unwrap();
1543 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1544
1545 let alter_kind = AlterKind::AddColumns {
1547 columns: vec![AddColumnRequest {
1548 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1549 is_key: true,
1550 location: None,
1551 add_if_not_exists: true,
1552 }],
1553 };
1554 let new_meta = meta
1555 .builder_with_alter_kind("my_table", &alter_kind)
1556 .unwrap()
1557 .build()
1558 .unwrap();
1559 assert_eq!(
1560 meta.schema.column_schemas(),
1561 new_meta.schema.column_schemas()
1562 );
1563 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1564 }
1565
1566 #[test]
1567 fn test_add_different_type_column() {
1568 let schema = Arc::new(new_test_schema());
1569 let meta = TableMetaBuilder::empty()
1570 .schema(schema)
1571 .primary_key_indices(vec![0])
1572 .engine("engine")
1573 .next_column_id(3)
1574 .build()
1575 .unwrap();
1576
1577 let alter_kind = AlterKind::AddColumns {
1579 columns: vec![AddColumnRequest {
1580 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1581 is_key: false,
1582 location: None,
1583 add_if_not_exists: true,
1584 }],
1585 };
1586 let err = meta
1587 .builder_with_alter_kind("my_table", &alter_kind)
1588 .err()
1589 .unwrap();
1590 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1591 }
1592
1593 #[test]
1594 fn test_add_invalid_column() {
1595 let schema = Arc::new(new_test_schema());
1596 let meta = TableMetaBuilder::empty()
1597 .schema(schema)
1598 .primary_key_indices(vec![0])
1599 .engine("engine")
1600 .next_column_id(3)
1601 .build()
1602 .unwrap();
1603
1604 let alter_kind = AlterKind::AddColumns {
1606 columns: vec![AddColumnRequest {
1607 column_schema: ColumnSchema::new(
1608 "weny",
1609 ConcreteDataType::string_datatype(),
1610 false,
1611 ),
1612 is_key: false,
1613 location: None,
1614 add_if_not_exists: false,
1615 }],
1616 };
1617
1618 let err = meta
1619 .builder_with_alter_kind("my_table", &alter_kind)
1620 .err()
1621 .unwrap();
1622 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1623 }
1624
1625 #[test]
1626 fn test_remove_unknown_column() {
1627 let schema = Arc::new(new_test_schema());
1628 let meta = TableMetaBuilder::empty()
1629 .schema(schema)
1630 .primary_key_indices(vec![0])
1631 .engine("engine")
1632 .next_column_id(3)
1633 .build()
1634 .unwrap();
1635
1636 let alter_kind = AlterKind::DropColumns {
1637 names: vec![String::from("unknown")],
1638 };
1639
1640 let err = meta
1641 .builder_with_alter_kind("my_table", &alter_kind)
1642 .err()
1643 .unwrap();
1644 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1645 }
1646
1647 #[test]
1648 fn test_change_unknown_column_data_type() {
1649 let schema = Arc::new(new_test_schema());
1650 let meta = TableMetaBuilder::empty()
1651 .schema(schema)
1652 .primary_key_indices(vec![0])
1653 .engine("engine")
1654 .next_column_id(3)
1655 .build()
1656 .unwrap();
1657
1658 let alter_kind = AlterKind::ModifyColumnTypes {
1659 columns: vec![ModifyColumnTypeRequest {
1660 column_name: "unknown".to_string(),
1661 target_type: ConcreteDataType::string_datatype(),
1662 }],
1663 };
1664
1665 let err = meta
1666 .builder_with_alter_kind("my_table", &alter_kind)
1667 .err()
1668 .unwrap();
1669 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1670 }
1671
1672 #[test]
1673 fn test_remove_key_column() {
1674 let schema = Arc::new(new_test_schema());
1675 let meta = TableMetaBuilder::empty()
1676 .schema(schema)
1677 .primary_key_indices(vec![0])
1678 .engine("engine")
1679 .next_column_id(3)
1680 .build()
1681 .unwrap();
1682
1683 let alter_kind = AlterKind::DropColumns {
1685 names: vec![String::from("col1")],
1686 };
1687
1688 let err = meta
1689 .builder_with_alter_kind("my_table", &alter_kind)
1690 .err()
1691 .unwrap();
1692 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1693
1694 let alter_kind = AlterKind::DropColumns {
1696 names: vec![String::from("ts")],
1697 };
1698
1699 let err = meta
1700 .builder_with_alter_kind("my_table", &alter_kind)
1701 .err()
1702 .unwrap();
1703 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1704 }
1705
1706 #[test]
1707 fn test_change_key_column_data_type() {
1708 let schema = Arc::new(new_test_schema());
1709 let meta = TableMetaBuilder::empty()
1710 .schema(schema)
1711 .primary_key_indices(vec![0])
1712 .engine("engine")
1713 .next_column_id(3)
1714 .build()
1715 .unwrap();
1716
1717 let alter_kind = AlterKind::ModifyColumnTypes {
1719 columns: vec![ModifyColumnTypeRequest {
1720 column_name: "col1".to_string(),
1721 target_type: ConcreteDataType::string_datatype(),
1722 }],
1723 };
1724
1725 let err = meta
1726 .builder_with_alter_kind("my_table", &alter_kind)
1727 .err()
1728 .unwrap();
1729 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1730
1731 let alter_kind = AlterKind::ModifyColumnTypes {
1733 columns: vec![ModifyColumnTypeRequest {
1734 column_name: "ts".to_string(),
1735 target_type: ConcreteDataType::string_datatype(),
1736 }],
1737 };
1738
1739 let err = meta
1740 .builder_with_alter_kind("my_table", &alter_kind)
1741 .err()
1742 .unwrap();
1743 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1744 }
1745
1746 #[test]
1747 fn test_alloc_new_column() {
1748 let schema = Arc::new(new_test_schema());
1749 let mut meta = TableMetaBuilder::empty()
1750 .schema(schema)
1751 .primary_key_indices(vec![0])
1752 .engine("engine")
1753 .next_column_id(3)
1754 .build()
1755 .unwrap();
1756 assert_eq!(3, meta.next_column_id);
1757
1758 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1759 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1760
1761 assert_eq!(4, meta.next_column_id);
1762 assert_eq!(column_schema.name, desc.name);
1763 }
1764
1765 #[test]
1766 fn test_add_columns_with_location() {
1767 let schema = Arc::new(new_test_schema());
1768 let meta = TableMetaBuilder::empty()
1769 .schema(schema)
1770 .primary_key_indices(vec![0])
1771 .engine("engine")
1772 .next_column_id(3)
1773 .build()
1774 .unwrap();
1775
1776 let new_meta = add_columns_to_meta_with_location(&meta);
1777 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1778
1779 let names: Vec<String> = new_meta
1780 .schema
1781 .column_schemas()
1782 .iter()
1783 .map(|column_schema| column_schema.name.clone())
1784 .collect();
1785 assert_eq!(
1786 &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
1787 &names[..]
1788 );
1789 assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
1790 assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
1791 }
1792
1793 #[test]
1794 fn test_modify_column_fulltext_options() {
1795 let schema = Arc::new(new_test_schema());
1796 let meta = TableMetaBuilder::empty()
1797 .schema(schema)
1798 .primary_key_indices(vec![0])
1799 .engine("engine")
1800 .next_column_id(3)
1801 .build()
1802 .unwrap();
1803
1804 let alter_kind = AlterKind::SetIndex {
1805 options: SetIndexOptions::Fulltext {
1806 column_name: "col1".to_string(),
1807 options: FulltextOptions::default(),
1808 },
1809 };
1810 let err = meta
1811 .builder_with_alter_kind("my_table", &alter_kind)
1812 .err()
1813 .unwrap();
1814 assert_eq!(
1815 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1816 err.to_string()
1817 );
1818
1819 let new_meta = add_columns_to_meta_with_location(&meta);
1821 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1822
1823 let alter_kind = AlterKind::SetIndex {
1824 options: SetIndexOptions::Fulltext {
1825 column_name: "my_tag_first".to_string(),
1826 options: FulltextOptions {
1827 enable: true,
1828 analyzer: FulltextAnalyzer::Chinese,
1829 case_sensitive: true,
1830 backend: FulltextBackend::Bloom,
1831 },
1832 },
1833 };
1834 let new_meta = new_meta
1835 .builder_with_alter_kind("my_table", &alter_kind)
1836 .unwrap()
1837 .build()
1838 .unwrap();
1839 let column_schema = new_meta
1840 .schema
1841 .column_schema_by_name("my_tag_first")
1842 .unwrap();
1843 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1844 assert!(fulltext_options.enable);
1845 assert_eq!(
1846 datatypes::schema::FulltextAnalyzer::Chinese,
1847 fulltext_options.analyzer
1848 );
1849 assert!(fulltext_options.case_sensitive);
1850
1851 let alter_kind = AlterKind::UnsetIndex {
1852 options: UnsetIndexOptions::Fulltext {
1853 column_name: "my_tag_first".to_string(),
1854 },
1855 };
1856 let new_meta = new_meta
1857 .builder_with_alter_kind("my_table", &alter_kind)
1858 .unwrap()
1859 .build()
1860 .unwrap();
1861 let column_schema = new_meta
1862 .schema
1863 .column_schema_by_name("my_tag_first")
1864 .unwrap();
1865 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1866 assert!(!fulltext_options.enable);
1867 }
1868}