1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39 UnsetIndexOptions,
40};
41
42pub type TableId = u32;
43pub type TableVersion = u64;
44
45#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
48pub enum FilterPushDownType {
49 Unsupported,
51 Inexact,
56 Exact,
60}
61
62impl From<TableProviderFilterPushDown> for FilterPushDownType {
63 fn from(value: TableProviderFilterPushDown) -> Self {
64 match value {
65 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
66 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
67 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
68 }
69 }
70}
71
72impl From<FilterPushDownType> for TableProviderFilterPushDown {
73 fn from(value: FilterPushDownType) -> Self {
74 match value {
75 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
76 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
77 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
78 }
79 }
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
84pub enum TableType {
85 Base,
87 View,
89 Temporary,
91}
92
93impl std::fmt::Display for TableType {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 TableType::Base => f.write_str("BASE TABLE"),
97 TableType::Temporary => f.write_str("TEMPORARY"),
98 TableType::View => f.write_str("VIEW"),
99 }
100 }
101}
102
103#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
105pub struct TableIdent {
106 pub table_id: TableId,
108 pub version: TableVersion,
111}
112
113#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
117#[builder(pattern = "mutable", custom_constructor)]
118pub struct TableMeta {
119 pub schema: SchemaRef,
120 pub primary_key_indices: Vec<usize>,
123 #[builder(default = "self.default_value_indices()?")]
124 pub value_indices: Vec<usize>,
125 #[builder(default, setter(into))]
126 pub engine: String,
127 #[builder(default, setter(into))]
128 pub region_numbers: Vec<u32>,
129 pub next_column_id: ColumnId,
130 #[builder(default)]
132 pub options: TableOptions,
133 #[builder(default = "Utc::now()")]
134 pub created_on: DateTime<Utc>,
135 #[builder(default = "Vec::new()")]
136 pub partition_key_indices: Vec<usize>,
137}
138
139impl TableMetaBuilder {
140 #[cfg(any(test, feature = "testing"))]
142 pub fn empty() -> Self {
143 Self {
144 schema: None,
145 primary_key_indices: None,
146 value_indices: None,
147 engine: None,
148 region_numbers: None,
149 next_column_id: None,
150 options: None,
151 created_on: None,
152 partition_key_indices: None,
153 }
154 }
155}
156
157impl TableMetaBuilder {
158 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
159 match (&self.primary_key_indices, &self.schema) {
160 (Some(v), Some(schema)) => {
161 let column_schemas = schema.column_schemas();
162 Ok((0..column_schemas.len())
163 .filter(|idx| !v.contains(idx))
164 .collect())
165 }
166 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
167 }
168 }
169
170 pub fn new_external_table() -> Self {
171 Self {
172 schema: None,
173 primary_key_indices: Some(Vec::new()),
174 value_indices: Some(Vec::new()),
175 engine: None,
176 region_numbers: Some(Vec::new()),
177 next_column_id: Some(0),
178 options: None,
179 created_on: None,
180 partition_key_indices: None,
181 }
182 }
183}
184
185struct SplitResult<'a> {
187 columns_at_first: Vec<&'a AddColumnRequest>,
189 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
191 columns_at_last: Vec<&'a AddColumnRequest>,
193 column_names: Vec<String>,
195}
196
197impl TableMeta {
198 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
199 let columns_schemas = &self.schema.column_schemas();
200 self.primary_key_indices
201 .iter()
202 .map(|idx| &columns_schemas[*idx].name)
203 }
204
205 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
206 let columns_schemas = self.schema.column_schemas();
208 let primary_key_indices = &self.primary_key_indices;
209 columns_schemas
210 .iter()
211 .enumerate()
212 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
213 .map(|(_, cs)| &cs.name)
214 }
215
216 pub fn builder_with_alter_kind(
220 &self,
221 table_name: &str,
222 alter_kind: &AlterKind,
223 ) -> Result<TableMetaBuilder> {
224 match alter_kind {
225 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
226 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
227 AlterKind::ModifyColumnTypes { columns } => {
228 self.modify_column_types(table_name, columns)
229 }
230 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
232 AlterKind::SetTableOptions { options } => self.set_table_options(options),
233 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
234 AlterKind::SetIndex { options } => match options {
235 SetIndexOptions::Fulltext {
236 column_name,
237 options,
238 } => self.change_column_fulltext_options(
239 table_name,
240 column_name,
241 true,
242 Some(options),
243 ),
244 SetIndexOptions::Inverted { column_name } => {
245 self.change_column_modify_inverted_index(table_name, column_name, true)
246 }
247 SetIndexOptions::Skipping {
248 column_name,
249 options,
250 } => self.change_column_skipping_index_options(
251 table_name,
252 column_name,
253 Some(options),
254 ),
255 },
256 AlterKind::UnsetIndex { options } => match options {
257 UnsetIndexOptions::Fulltext { column_name } => {
258 self.change_column_fulltext_options(table_name, column_name, false, None)
259 }
260 UnsetIndexOptions::Inverted { column_name } => {
261 self.change_column_modify_inverted_index(table_name, column_name, false)
262 }
263 UnsetIndexOptions::Skipping { column_name } => {
264 self.change_column_skipping_index_options(table_name, column_name, None)
265 }
266 },
267 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
268 }
269 }
270
271 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
273 let mut new_options = self.options.clone();
274
275 for request in requests {
276 match request {
277 SetRegionOption::Ttl(new_ttl) => {
278 new_options.ttl = *new_ttl;
279 }
280 SetRegionOption::Twsc(key, value) => {
281 if !value.is_empty() {
282 new_options
283 .extra_options
284 .insert(key.to_string(), value.to_string());
285 new_options.extra_options.insert(
287 COMPACTION_TYPE.to_string(),
288 COMPACTION_TYPE_TWCS.to_string(),
289 );
290 } else {
291 new_options.extra_options.remove(key.as_str());
293 }
294 }
295 }
296 }
297 let mut builder = self.new_meta_builder();
298 builder.options(new_options);
299
300 Ok(builder)
301 }
302
303 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
304 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
305 self.set_table_options(&requests)
306 }
307
308 fn change_column_modify_inverted_index(
310 &self,
311 table_name: &str,
312 column_name: &str,
313 value: bool,
314 ) -> Result<TableMetaBuilder> {
315 let table_schema = &self.schema;
316 let mut meta_builder = self.new_meta_builder();
317
318 let mut columns: Vec<ColumnSchema> =
319 Vec::with_capacity(table_schema.column_schemas().len());
320
321 for column_schema in table_schema.column_schemas().iter() {
322 if column_schema.name == column_name {
323 let mut new_column_schema = column_schema.clone();
324 new_column_schema.set_inverted_index(value);
325 columns.push(new_column_schema);
326 } else {
327 columns.push(column_schema.clone());
328 }
329 }
330
331 let mut builder = SchemaBuilder::try_from_columns(columns)
333 .with_context(|_| error::SchemaBuildSnafu {
334 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
335 })?
336 .version(table_schema.version() + 1);
337
338 for (k, v) in table_schema.metadata().iter() {
339 builder = builder.add_metadata(k, v);
340 }
341
342 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
343 msg: format!(
344 "Table {table_name} cannot change fulltext options for column {column_name}",
345 ),
346 })?;
347
348 let _ = meta_builder
349 .schema(Arc::new(new_schema))
350 .primary_key_indices(self.primary_key_indices.clone());
351
352 Ok(meta_builder)
353 }
354
355 fn change_column_fulltext_options(
357 &self,
358 table_name: &str,
359 column_name: &str,
360 enable: bool,
361 options: Option<&FulltextOptions>,
362 ) -> Result<TableMetaBuilder> {
363 let table_schema = &self.schema;
364 let mut meta_builder = self.new_meta_builder();
365
366 let column = &table_schema
367 .column_schema_by_name(column_name)
368 .with_context(|| error::ColumnNotExistsSnafu {
369 column_name,
370 table_name,
371 })?;
372
373 ensure!(
374 column.data_type.is_string(),
375 error::InvalidColumnOptionSnafu {
376 column_name,
377 msg: "FULLTEXT index only supports string type",
378 }
379 );
380
381 let current_fulltext_options = column
382 .fulltext_options()
383 .context(error::SetFulltextOptionsSnafu { column_name })?;
384
385 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
386 for column_schema in table_schema.column_schemas() {
387 if column_schema.name == column_name {
388 let mut new_column_schema = column_schema.clone();
389 if enable {
390 ensure!(
391 options.is_some(),
392 error::InvalidColumnOptionSnafu {
393 column_name,
394 msg: "FULLTEXT index options must be provided",
395 }
396 );
397 set_column_fulltext_options(
398 &mut new_column_schema,
399 column_name,
400 options.unwrap(),
401 current_fulltext_options.clone(),
402 )?
403 } else {
404 unset_column_fulltext_options(
405 &mut new_column_schema,
406 column_name,
407 current_fulltext_options.clone(),
408 )?
409 }
410 columns.push(new_column_schema);
411 } else {
412 columns.push(column_schema.clone());
413 }
414 }
415
416 let mut builder = SchemaBuilder::try_from_columns(columns)
418 .with_context(|_| error::SchemaBuildSnafu {
419 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
420 })?
421 .version(table_schema.version() + 1);
422
423 for (k, v) in table_schema.metadata().iter() {
424 builder = builder.add_metadata(k, v);
425 }
426
427 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
428 msg: format!(
429 "Table {table_name} cannot change fulltext options for column {column_name}",
430 ),
431 })?;
432
433 let _ = meta_builder
434 .schema(Arc::new(new_schema))
435 .primary_key_indices(self.primary_key_indices.clone());
436
437 Ok(meta_builder)
438 }
439
440 fn change_column_skipping_index_options(
442 &self,
443 table_name: &str,
444 column_name: &str,
445 options: Option<&SkippingIndexOptions>,
446 ) -> Result<TableMetaBuilder> {
447 let table_schema = &self.schema;
448 let mut meta_builder = self.new_meta_builder();
449
450 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
451 for column_schema in table_schema.column_schemas() {
452 if column_schema.name == column_name {
453 let mut new_column_schema = column_schema.clone();
454 if let Some(options) = options {
455 set_column_skipping_index_options(
456 &mut new_column_schema,
457 column_name,
458 options,
459 )?;
460 } else {
461 unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
462 }
463 columns.push(new_column_schema);
464 } else {
465 columns.push(column_schema.clone());
466 }
467 }
468
469 let mut builder = SchemaBuilder::try_from_columns(columns)
470 .with_context(|_| error::SchemaBuildSnafu {
471 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
472 })?
473 .version(table_schema.version() + 1);
474
475 for (k, v) in table_schema.metadata().iter() {
476 builder = builder.add_metadata(k, v);
477 }
478
479 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
480 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
481 })?;
482
483 let _ = meta_builder
484 .schema(Arc::new(new_schema))
485 .primary_key_indices(self.primary_key_indices.clone());
486
487 Ok(meta_builder)
488 }
489
490 pub fn alloc_new_column(
495 &mut self,
496 table_name: &str,
497 new_column: &ColumnSchema,
498 ) -> Result<ColumnDescriptor> {
499 let desc = ColumnDescriptorBuilder::new(
500 self.next_column_id as ColumnId,
501 &new_column.name,
502 new_column.data_type.clone(),
503 )
504 .is_nullable(new_column.is_nullable())
505 .default_constraint(new_column.default_constraint().cloned())
506 .build()
507 .context(error::BuildColumnDescriptorSnafu {
508 table_name,
509 column_name: &new_column.name,
510 })?;
511
512 self.next_column_id += 1;
514
515 Ok(desc)
516 }
517
518 fn new_meta_builder(&self) -> TableMetaBuilder {
520 let mut builder = TableMetaBuilder::from(self);
521 builder.value_indices = None;
523 builder
524 }
525
526 fn add_columns(
528 &self,
529 table_name: &str,
530 requests: &[AddColumnRequest],
531 ) -> Result<TableMetaBuilder> {
532 let table_schema = &self.schema;
533 let mut meta_builder = self.new_meta_builder();
534 let original_primary_key_indices: HashSet<&usize> =
535 self.primary_key_indices.iter().collect();
536
537 let mut names = HashSet::with_capacity(requests.len());
538 let mut new_columns = Vec::with_capacity(requests.len());
539 for col_to_add in requests {
540 if let Some(column_schema) =
541 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
542 {
543 ensure!(
545 col_to_add.add_if_not_exists,
546 error::ColumnExistsSnafu {
547 table_name,
548 column_name: &col_to_add.column_schema.name
549 },
550 );
551
552 ensure!(
554 column_schema.data_type == col_to_add.column_schema.data_type,
555 error::InvalidAlterRequestSnafu {
556 table: table_name,
557 err: format!(
558 "column {} already exists with different type {:?}",
559 col_to_add.column_schema.name, column_schema.data_type,
560 ),
561 }
562 );
563 } else {
564 ensure!(
567 names.insert(&col_to_add.column_schema.name),
568 error::InvalidAlterRequestSnafu {
569 table: table_name,
570 err: format!(
571 "add column {} more than once",
572 col_to_add.column_schema.name
573 ),
574 }
575 );
576
577 ensure!(
578 col_to_add.column_schema.is_nullable()
579 || col_to_add.column_schema.default_constraint().is_some(),
580 error::InvalidAlterRequestSnafu {
581 table: table_name,
582 err: format!(
583 "no default value for column {}",
584 col_to_add.column_schema.name
585 ),
586 },
587 );
588
589 new_columns.push(col_to_add.clone());
590 }
591 }
592 let requests = &new_columns[..];
593
594 let SplitResult {
595 columns_at_first,
596 columns_at_after,
597 columns_at_last,
598 column_names,
599 } = self.split_requests_by_column_location(table_name, requests)?;
600 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
601 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
602 columns_at_first.iter().rev().for_each(|request| {
604 if request.is_key {
605 primary_key_indices.push(columns.len());
607 }
608 columns.push(request.column_schema.clone());
609 });
610 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
612 if original_primary_key_indices.contains(&index) {
613 primary_key_indices.push(columns.len());
614 }
615 columns.push(column_schema.clone());
616 if let Some(requests) = columns_at_after.get(&column_schema.name) {
617 requests.iter().rev().for_each(|request| {
618 if request.is_key {
619 primary_key_indices.push(columns.len());
621 }
622 columns.push(request.column_schema.clone());
623 });
624 }
625 }
626 columns_at_last.iter().for_each(|request| {
628 if request.is_key {
629 primary_key_indices.push(columns.len());
631 }
632 columns.push(request.column_schema.clone());
633 });
634
635 let mut builder = SchemaBuilder::try_from(columns)
636 .with_context(|_| error::SchemaBuildSnafu {
637 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
638 })?
639 .version(table_schema.version() + 1);
641 for (k, v) in table_schema.metadata().iter() {
642 builder = builder.add_metadata(k, v);
643 }
644 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
645 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
646 })?;
647
648 let _ = meta_builder
650 .schema(Arc::new(new_schema))
651 .primary_key_indices(primary_key_indices);
652
653 Ok(meta_builder)
654 }
655
656 fn remove_columns(
657 &self,
658 table_name: &str,
659 column_names: &[String],
660 ) -> Result<TableMetaBuilder> {
661 let table_schema = &self.schema;
662 let column_names: HashSet<_> = column_names.iter().collect();
663 let mut meta_builder = self.new_meta_builder();
664
665 let timestamp_index = table_schema.timestamp_index();
666 for column_name in &column_names {
668 if let Some(index) = table_schema.column_index_by_name(column_name) {
669 ensure!(
672 !self.primary_key_indices.contains(&index),
673 error::RemoveColumnInIndexSnafu {
674 column_name: *column_name,
675 table_name,
676 }
677 );
678
679 if let Some(ts_index) = timestamp_index {
680 ensure!(
682 index != ts_index,
683 error::RemoveColumnInIndexSnafu {
684 column_name: table_schema.column_name_by_index(ts_index),
685 table_name,
686 }
687 );
688 }
689 } else {
690 return error::ColumnNotExistsSnafu {
691 column_name: *column_name,
692 table_name,
693 }
694 .fail()?;
695 }
696 }
697
698 let columns: Vec<_> = table_schema
700 .column_schemas()
701 .iter()
702 .filter(|column_schema| !column_names.contains(&column_schema.name))
703 .cloned()
704 .collect();
705
706 let mut builder = SchemaBuilder::try_from_columns(columns)
707 .with_context(|_| error::SchemaBuildSnafu {
708 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
709 })?
710 .version(table_schema.version() + 1);
712 for (k, v) in table_schema.metadata().iter() {
713 builder = builder.add_metadata(k, v);
714 }
715 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
716 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
717 })?;
718
719 let primary_key_indices = self
721 .primary_key_indices
722 .iter()
723 .map(|idx| table_schema.column_name_by_index(*idx))
724 .map(|name| new_schema.column_index_by_name(name).unwrap())
726 .collect();
727
728 let _ = meta_builder
729 .schema(Arc::new(new_schema))
730 .primary_key_indices(primary_key_indices);
731
732 Ok(meta_builder)
733 }
734
735 fn modify_column_types(
736 &self,
737 table_name: &str,
738 requests: &[ModifyColumnTypeRequest],
739 ) -> Result<TableMetaBuilder> {
740 let table_schema = &self.schema;
741 let mut meta_builder = self.new_meta_builder();
742
743 let mut modify_column_types = HashMap::with_capacity(requests.len());
744 let timestamp_index = table_schema.timestamp_index();
745
746 for col_to_change in requests {
747 let change_column_name = &col_to_change.column_name;
748
749 let index = table_schema
750 .column_index_by_name(change_column_name)
751 .with_context(|| error::ColumnNotExistsSnafu {
752 column_name: change_column_name,
753 table_name,
754 })?;
755
756 let column = &table_schema.column_schemas()[index];
757
758 ensure!(
759 !self.primary_key_indices.contains(&index),
760 error::InvalidAlterRequestSnafu {
761 table: table_name,
762 err: format!(
763 "Not allowed to change primary key index column '{}'",
764 column.name
765 )
766 }
767 );
768
769 if let Some(ts_index) = timestamp_index {
770 ensure!(
772 index != ts_index,
773 error::InvalidAlterRequestSnafu {
774 table: table_name,
775 err: format!(
776 "Not allowed to change timestamp index column '{}' datatype",
777 column.name
778 )
779 }
780 );
781 }
782
783 ensure!(
784 modify_column_types
785 .insert(&col_to_change.column_name, col_to_change)
786 .is_none(),
787 error::InvalidAlterRequestSnafu {
788 table: table_name,
789 err: format!(
790 "change column datatype {} more than once",
791 col_to_change.column_name
792 ),
793 }
794 );
795
796 ensure!(
797 column
798 .data_type
799 .can_arrow_type_cast_to(&col_to_change.target_type),
800 error::InvalidAlterRequestSnafu {
801 table: table_name,
802 err: format!(
803 "column '{}' cannot be cast automatically to type '{}'",
804 col_to_change.column_name, col_to_change.target_type,
805 ),
806 }
807 );
808
809 ensure!(
810 column.is_nullable(),
811 error::InvalidAlterRequestSnafu {
812 table: table_name,
813 err: format!(
814 "column '{}' must be nullable to ensure safe conversion.",
815 col_to_change.column_name,
816 ),
817 }
818 );
819 }
820 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
823 for mut column in table_schema.column_schemas().iter().cloned() {
824 if let Some(change_column) = modify_column_types.get(&column.name) {
825 column.data_type = change_column.target_type.clone();
826 let new_default = if let Some(default_value) = column.default_constraint() {
827 Some(
828 default_value
829 .cast_to_datatype(&change_column.target_type)
830 .with_context(|_| error::CastDefaultValueSnafu {
831 reason: format!(
832 "Failed to cast default value from {:?} to type {:?}",
833 default_value, &change_column.target_type
834 ),
835 })?,
836 )
837 } else {
838 None
839 };
840 column = column
841 .clone()
842 .with_default_constraint(new_default.clone())
843 .with_context(|_| error::CastDefaultValueSnafu {
844 reason: format!("Failed to set new default: {:?}", new_default),
845 })?;
846 }
847 columns.push(column)
848 }
849
850 let mut builder = SchemaBuilder::try_from_columns(columns)
851 .with_context(|_| error::SchemaBuildSnafu {
852 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
853 })?
854 .version(table_schema.version() + 1);
856 for (k, v) in table_schema.metadata().iter() {
857 builder = builder.add_metadata(k, v);
858 }
859 let new_schema = builder.build().with_context(|_| {
860 let column_names: Vec<_> = requests
861 .iter()
862 .map(|request| &request.column_name)
863 .collect();
864
865 error::SchemaBuildSnafu {
866 msg: format!(
867 "Table {table_name} cannot change datatype with columns {column_names:?}"
868 ),
869 }
870 })?;
871
872 let _ = meta_builder
873 .schema(Arc::new(new_schema))
874 .primary_key_indices(self.primary_key_indices.clone());
875
876 Ok(meta_builder)
877 }
878
879 fn split_requests_by_column_location<'a>(
881 &self,
882 table_name: &str,
883 requests: &'a [AddColumnRequest],
884 ) -> Result<SplitResult<'a>> {
885 let table_schema = &self.schema;
886 let mut columns_at_first = Vec::new();
887 let mut columns_at_after = HashMap::new();
888 let mut columns_at_last = Vec::new();
889 let mut column_names = Vec::with_capacity(requests.len());
890 for request in requests {
891 let column_name = &request.column_schema.name;
893 column_names.push(column_name.clone());
894 ensure!(
895 table_schema.column_schema_by_name(column_name).is_none(),
896 error::ColumnExistsSnafu {
897 column_name,
898 table_name,
899 }
900 );
901 match request.location.as_ref() {
902 Some(AddColumnLocation::First) => {
903 columns_at_first.push(request);
904 }
905 Some(AddColumnLocation::After { column_name }) => {
906 ensure!(
907 table_schema.column_schema_by_name(column_name).is_some(),
908 error::ColumnNotExistsSnafu {
909 column_name,
910 table_name,
911 }
912 );
913 columns_at_after
914 .entry(column_name.clone())
915 .or_insert(Vec::new())
916 .push(request);
917 }
918 None => {
919 columns_at_last.push(request);
920 }
921 }
922 }
923 Ok(SplitResult {
924 columns_at_first,
925 columns_at_after,
926 columns_at_last,
927 column_names,
928 })
929 }
930
931 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
932 let table_schema = &self.schema;
933 let mut meta_builder = self.new_meta_builder();
934 let mut columns = Vec::with_capacity(table_schema.num_columns());
935 for column_schema in table_schema.column_schemas() {
936 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
937 ensure!(
939 column_schema.default_constraint().is_some(),
940 error::InvalidAlterRequestSnafu {
941 table: table_name,
942 err: format!("column {name} does not have a default value"),
943 }
944 );
945 if !column_schema.is_nullable() {
946 return error::InvalidAlterRequestSnafu {
947 table: table_name,
948 err: format!(
949 "column {name} is not nullable and `default` cannot be dropped",
950 ),
951 }
952 .fail();
953 }
954 let new_column_schema = column_schema.clone();
955 let new_column_schema = new_column_schema
956 .with_default_constraint(None)
957 .with_context(|_| error::SchemaBuildSnafu {
958 msg: format!("Table {table_name} cannot drop default values"),
959 })?;
960 columns.push(new_column_schema);
961 } else {
962 columns.push(column_schema.clone());
963 }
964 }
965
966 let mut builder = SchemaBuilder::try_from_columns(columns)
967 .with_context(|_| error::SchemaBuildSnafu {
968 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
969 })?
970 .version(table_schema.version() + 1);
972 for (k, v) in table_schema.metadata().iter() {
973 builder = builder.add_metadata(k, v);
974 }
975 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
976 msg: format!("Table {table_name} cannot drop default values"),
977 })?;
978
979 let _ = meta_builder.schema(Arc::new(new_schema));
980
981 Ok(meta_builder)
982 }
983}
984#[derive(Clone, Debug, PartialEq, Eq, Builder)]
985#[builder(pattern = "owned")]
986pub struct TableInfo {
987 #[builder(default, setter(into))]
989 pub ident: TableIdent,
990 #[builder(setter(into))]
992 pub name: String,
993 #[builder(default, setter(into))]
995 pub desc: Option<String>,
996 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
997 pub catalog_name: String,
998 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
999 pub schema_name: String,
1000 pub meta: TableMeta,
1001 #[builder(default = "TableType::Base")]
1002 pub table_type: TableType,
1003}
1004
1005pub type TableInfoRef = Arc<TableInfo>;
1006
1007impl TableInfo {
1008 pub fn table_id(&self) -> TableId {
1009 self.ident.table_id
1010 }
1011
1012 pub fn region_ids(&self) -> Vec<RegionId> {
1013 self.meta
1014 .region_numbers
1015 .iter()
1016 .map(|id| RegionId::new(self.table_id(), *id))
1017 .collect()
1018 }
1019 pub fn full_table_name(&self) -> String {
1021 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1022 }
1023
1024 pub fn is_physical_table(&self) -> bool {
1026 self.meta
1027 .options
1028 .extra_options
1029 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1030 }
1031
1032 pub fn is_ttl_instant_table(&self) -> bool {
1034 self.meta
1035 .options
1036 .ttl
1037 .map(|t| t.is_instant())
1038 .unwrap_or(false)
1039 }
1040}
1041
1042impl TableInfoBuilder {
1043 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1044 Self {
1045 name: Some(name.into()),
1046 meta: Some(meta),
1047 ..Default::default()
1048 }
1049 }
1050
1051 pub fn table_id(mut self, id: TableId) -> Self {
1052 let ident = self.ident.get_or_insert_with(TableIdent::default);
1053 ident.table_id = id;
1054 self
1055 }
1056
1057 pub fn table_version(mut self, version: TableVersion) -> Self {
1058 let ident = self.ident.get_or_insert_with(TableIdent::default);
1059 ident.version = version;
1060 self
1061 }
1062}
1063
1064impl TableIdent {
1065 pub fn new(table_id: TableId) -> Self {
1066 Self {
1067 table_id,
1068 version: 0,
1069 }
1070 }
1071}
1072
1073impl From<TableId> for TableIdent {
1074 fn from(table_id: TableId) -> Self {
1075 Self::new(table_id)
1076 }
1077}
1078
1079#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1081pub struct RawTableMeta {
1082 pub schema: RawSchema,
1083 pub primary_key_indices: Vec<usize>,
1086 pub value_indices: Vec<usize>,
1088 pub engine: String,
1090 pub next_column_id: ColumnId,
1093 pub region_numbers: Vec<u32>,
1094 pub options: TableOptions,
1095 pub created_on: DateTime<Utc>,
1096 #[serde(default)]
1098 pub partition_key_indices: Vec<usize>,
1099}
1100
1101impl From<TableMeta> for RawTableMeta {
1102 fn from(meta: TableMeta) -> RawTableMeta {
1103 RawTableMeta {
1104 schema: RawSchema::from(&*meta.schema),
1105 primary_key_indices: meta.primary_key_indices,
1106 value_indices: meta.value_indices,
1107 engine: meta.engine,
1108 next_column_id: meta.next_column_id,
1109 region_numbers: meta.region_numbers,
1110 options: meta.options,
1111 created_on: meta.created_on,
1112 partition_key_indices: meta.partition_key_indices,
1113 }
1114 }
1115}
1116
1117impl TryFrom<RawTableMeta> for TableMeta {
1118 type Error = ConvertError;
1119
1120 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1121 Ok(TableMeta {
1122 schema: Arc::new(Schema::try_from(raw.schema)?),
1123 primary_key_indices: raw.primary_key_indices,
1124 value_indices: raw.value_indices,
1125 engine: raw.engine,
1126 region_numbers: raw.region_numbers,
1127 next_column_id: raw.next_column_id,
1128 options: raw.options,
1129 created_on: raw.created_on,
1130 partition_key_indices: raw.partition_key_indices,
1131 })
1132 }
1133}
1134
1135#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1137pub struct RawTableInfo {
1138 pub ident: TableIdent,
1139 pub name: String,
1140 pub desc: Option<String>,
1141 pub catalog_name: String,
1142 pub schema_name: String,
1143 pub meta: RawTableMeta,
1144 pub table_type: TableType,
1145}
1146
1147impl RawTableInfo {
1148 pub fn sort_columns(&mut self) {
1150 let column_schemas = &self.meta.schema.column_schemas;
1151 let primary_keys = self
1152 .meta
1153 .primary_key_indices
1154 .iter()
1155 .map(|index| column_schemas[*index].name.clone())
1156 .collect::<HashSet<_>>();
1157
1158 self.meta
1159 .schema
1160 .column_schemas
1161 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1162
1163 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1165 let mut timestamp_index = None;
1166 let mut value_indices =
1167 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
1168 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1169 if primary_keys.contains(&column_schema.name) {
1170 primary_key_indices.push(index);
1171 } else if column_schema.is_time_index() {
1172 timestamp_index = Some(index);
1173 } else {
1174 value_indices.push(index);
1175 }
1176 }
1177
1178 self.meta.schema.timestamp_index = timestamp_index;
1180 self.meta.primary_key_indices = primary_key_indices;
1181 self.meta.value_indices = value_indices;
1182 }
1183
1184 pub fn to_region_options(&self) -> HashMap<String, String> {
1188 HashMap::from(&self.meta.options)
1189 }
1190}
1191
1192impl From<TableInfo> for RawTableInfo {
1193 fn from(info: TableInfo) -> RawTableInfo {
1194 RawTableInfo {
1195 ident: info.ident,
1196 name: info.name,
1197 desc: info.desc,
1198 catalog_name: info.catalog_name,
1199 schema_name: info.schema_name,
1200 meta: RawTableMeta::from(info.meta),
1201 table_type: info.table_type,
1202 }
1203 }
1204}
1205
1206impl TryFrom<RawTableInfo> for TableInfo {
1207 type Error = ConvertError;
1208
1209 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1210 Ok(TableInfo {
1211 ident: raw.ident,
1212 name: raw.name,
1213 desc: raw.desc,
1214 catalog_name: raw.catalog_name,
1215 schema_name: raw.schema_name,
1216 meta: TableMeta::try_from(raw.meta)?,
1217 table_type: raw.table_type,
1218 })
1219 }
1220}
1221
1222fn set_column_fulltext_options(
1231 column_schema: &mut ColumnSchema,
1232 column_name: &str,
1233 options: &FulltextOptions,
1234 current_options: Option<FulltextOptions>,
1235) -> Result<()> {
1236 if let Some(current_options) = current_options {
1237 ensure!(
1238 current_options.analyzer == options.analyzer
1239 && current_options.case_sensitive == options.case_sensitive,
1240 error::InvalidColumnOptionSnafu {
1241 column_name,
1242 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1243 current_options.analyzer, current_options.case_sensitive),
1244 }
1245 );
1246 }
1247
1248 column_schema
1249 .set_fulltext_options(options)
1250 .context(error::SetFulltextOptionsSnafu { column_name })?;
1251
1252 Ok(())
1253}
1254
1255fn unset_column_fulltext_options(
1256 column_schema: &mut ColumnSchema,
1257 column_name: &str,
1258 current_options: Option<FulltextOptions>,
1259) -> Result<()> {
1260 ensure!(
1261 current_options
1262 .as_ref()
1263 .is_some_and(|options| options.enable),
1264 error::InvalidColumnOptionSnafu {
1265 column_name,
1266 msg: "FULLTEXT index already disabled".to_string(),
1267 }
1268 );
1269
1270 let mut options = current_options.unwrap();
1271 options.enable = false;
1272 column_schema
1273 .set_fulltext_options(&options)
1274 .context(error::SetFulltextOptionsSnafu { column_name })?;
1275
1276 Ok(())
1277}
1278
1279fn set_column_skipping_index_options(
1280 column_schema: &mut ColumnSchema,
1281 column_name: &str,
1282 options: &SkippingIndexOptions,
1283) -> Result<()> {
1284 column_schema
1285 .set_skipping_options(options)
1286 .context(error::SetSkippingOptionsSnafu { column_name })?;
1287
1288 Ok(())
1289}
1290
1291fn unset_column_skipping_index_options(
1292 column_schema: &mut ColumnSchema,
1293 column_name: &str,
1294) -> Result<()> {
1295 column_schema
1296 .unset_skipping_options()
1297 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1298 Ok(())
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303 use common_error::ext::ErrorExt;
1304 use common_error::status_code::StatusCode;
1305 use datatypes::data_type::ConcreteDataType;
1306 use datatypes::schema::{
1307 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1308 };
1309
1310 use super::*;
1311
1312 fn new_test_schema() -> Schema {
1314 let column_schemas = vec![
1315 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1316 ColumnSchema::new(
1317 "ts",
1318 ConcreteDataType::timestamp_millisecond_datatype(),
1319 false,
1320 )
1321 .with_time_index(true),
1322 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1323 ];
1324 SchemaBuilder::try_from(column_schemas)
1325 .unwrap()
1326 .version(123)
1327 .build()
1328 .unwrap()
1329 }
1330
1331 #[test]
1332 fn test_raw_convert() {
1333 let schema = Arc::new(new_test_schema());
1334 let meta = TableMetaBuilder::empty()
1335 .schema(schema)
1336 .primary_key_indices(vec![0])
1337 .engine("engine")
1338 .next_column_id(3)
1339 .build()
1340 .unwrap();
1341 let info = TableInfoBuilder::default()
1342 .table_id(10)
1343 .table_version(5)
1344 .name("mytable")
1345 .meta(meta)
1346 .build()
1347 .unwrap();
1348
1349 let raw = RawTableInfo::from(info.clone());
1350 let info_new = TableInfo::try_from(raw).unwrap();
1351
1352 assert_eq!(info, info_new);
1353 }
1354
1355 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1356 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1357 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1358 let alter_kind = AlterKind::AddColumns {
1359 columns: vec![
1360 AddColumnRequest {
1361 column_schema: new_tag,
1362 is_key: true,
1363 location: None,
1364 add_if_not_exists: false,
1365 },
1366 AddColumnRequest {
1367 column_schema: new_field,
1368 is_key: false,
1369 location: None,
1370 add_if_not_exists: false,
1371 },
1372 ],
1373 };
1374
1375 let builder = meta
1376 .builder_with_alter_kind("my_table", &alter_kind)
1377 .unwrap();
1378 builder.build().unwrap()
1379 }
1380
1381 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1382 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1383 let new_field = ColumnSchema::new(
1384 "my_field_after_ts",
1385 ConcreteDataType::string_datatype(),
1386 true,
1387 );
1388 let alter_kind = AlterKind::AddColumns {
1389 columns: vec![
1390 AddColumnRequest {
1391 column_schema: new_tag,
1392 is_key: true,
1393 location: Some(AddColumnLocation::First),
1394 add_if_not_exists: false,
1395 },
1396 AddColumnRequest {
1397 column_schema: new_field,
1398 is_key: false,
1399 location: Some(AddColumnLocation::After {
1400 column_name: "ts".to_string(),
1401 }),
1402 add_if_not_exists: false,
1403 },
1404 ],
1405 };
1406
1407 let builder = meta
1408 .builder_with_alter_kind("my_table", &alter_kind)
1409 .unwrap();
1410 builder.build().unwrap()
1411 }
1412
1413 #[test]
1414 fn test_add_columns() {
1415 let schema = Arc::new(new_test_schema());
1416 let meta = TableMetaBuilder::empty()
1417 .schema(schema)
1418 .primary_key_indices(vec![0])
1419 .engine("engine")
1420 .next_column_id(3)
1421 .build()
1422 .unwrap();
1423
1424 let new_meta = add_columns_to_meta(&meta);
1425 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1426
1427 let names: Vec<String> = new_meta
1428 .schema
1429 .column_schemas()
1430 .iter()
1431 .map(|column_schema| column_schema.name.clone())
1432 .collect();
1433 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1434 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1435 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1436 }
1437
1438 #[test]
1439 fn test_add_columns_multiple_times() {
1440 let schema = Arc::new(new_test_schema());
1441 let meta = TableMetaBuilder::empty()
1442 .schema(schema)
1443 .primary_key_indices(vec![0])
1444 .engine("engine")
1445 .next_column_id(3)
1446 .build()
1447 .unwrap();
1448
1449 let alter_kind = AlterKind::AddColumns {
1450 columns: vec![
1451 AddColumnRequest {
1452 column_schema: ColumnSchema::new(
1453 "col3",
1454 ConcreteDataType::int32_datatype(),
1455 true,
1456 ),
1457 is_key: true,
1458 location: None,
1459 add_if_not_exists: true,
1460 },
1461 AddColumnRequest {
1462 column_schema: ColumnSchema::new(
1463 "col3",
1464 ConcreteDataType::int32_datatype(),
1465 true,
1466 ),
1467 is_key: true,
1468 location: None,
1469 add_if_not_exists: true,
1470 },
1471 ],
1472 };
1473 let err = meta
1474 .builder_with_alter_kind("my_table", &alter_kind)
1475 .err()
1476 .unwrap();
1477 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1478 }
1479
1480 #[test]
1481 fn test_remove_columns() {
1482 let schema = Arc::new(new_test_schema());
1483 let meta = TableMetaBuilder::empty()
1484 .schema(schema.clone())
1485 .primary_key_indices(vec![0])
1486 .engine("engine")
1487 .next_column_id(3)
1488 .build()
1489 .unwrap();
1490 let meta = add_columns_to_meta(&meta);
1492
1493 let alter_kind = AlterKind::DropColumns {
1494 names: vec![String::from("col2"), String::from("my_field")],
1495 };
1496 let new_meta = meta
1497 .builder_with_alter_kind("my_table", &alter_kind)
1498 .unwrap()
1499 .build()
1500 .unwrap();
1501
1502 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1503
1504 let names: Vec<String> = new_meta
1505 .schema
1506 .column_schemas()
1507 .iter()
1508 .map(|column_schema| column_schema.name.clone())
1509 .collect();
1510 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1511 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1512 assert_eq!(&[1], &new_meta.value_indices[..]);
1513 assert_eq!(
1514 schema.timestamp_column(),
1515 new_meta.schema.timestamp_column()
1516 );
1517 }
1518
1519 #[test]
1520 fn test_remove_multiple_columns_before_timestamp() {
1521 let column_schemas = vec![
1522 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1523 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1524 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1525 ColumnSchema::new(
1526 "ts",
1527 ConcreteDataType::timestamp_millisecond_datatype(),
1528 false,
1529 )
1530 .with_time_index(true),
1531 ];
1532 let schema = Arc::new(
1533 SchemaBuilder::try_from(column_schemas)
1534 .unwrap()
1535 .version(123)
1536 .build()
1537 .unwrap(),
1538 );
1539 let meta = TableMetaBuilder::empty()
1540 .schema(schema.clone())
1541 .primary_key_indices(vec![1])
1542 .engine("engine")
1543 .next_column_id(4)
1544 .build()
1545 .unwrap();
1546
1547 let alter_kind = AlterKind::DropColumns {
1549 names: vec![String::from("col3"), String::from("col1")],
1550 };
1551 let new_meta = meta
1552 .builder_with_alter_kind("my_table", &alter_kind)
1553 .unwrap()
1554 .build()
1555 .unwrap();
1556
1557 let names: Vec<String> = new_meta
1558 .schema
1559 .column_schemas()
1560 .iter()
1561 .map(|column_schema| column_schema.name.clone())
1562 .collect();
1563 assert_eq!(&["col2", "ts"], &names[..]);
1564 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1565 assert_eq!(&[1], &new_meta.value_indices[..]);
1566 assert_eq!(
1567 schema.timestamp_column(),
1568 new_meta.schema.timestamp_column()
1569 );
1570 }
1571
1572 #[test]
1573 fn test_add_existing_column() {
1574 let schema = Arc::new(new_test_schema());
1575 let meta = TableMetaBuilder::empty()
1576 .schema(schema)
1577 .primary_key_indices(vec![0])
1578 .engine("engine")
1579 .next_column_id(3)
1580 .build()
1581 .unwrap();
1582
1583 let alter_kind = AlterKind::AddColumns {
1584 columns: vec![AddColumnRequest {
1585 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1586 is_key: false,
1587 location: None,
1588 add_if_not_exists: false,
1589 }],
1590 };
1591
1592 let err = meta
1593 .builder_with_alter_kind("my_table", &alter_kind)
1594 .err()
1595 .unwrap();
1596 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1597
1598 let alter_kind = AlterKind::AddColumns {
1600 columns: vec![AddColumnRequest {
1601 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1602 is_key: true,
1603 location: None,
1604 add_if_not_exists: true,
1605 }],
1606 };
1607 let new_meta = meta
1608 .builder_with_alter_kind("my_table", &alter_kind)
1609 .unwrap()
1610 .build()
1611 .unwrap();
1612 assert_eq!(
1613 meta.schema.column_schemas(),
1614 new_meta.schema.column_schemas()
1615 );
1616 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1617 }
1618
1619 #[test]
1620 fn test_add_different_type_column() {
1621 let schema = Arc::new(new_test_schema());
1622 let meta = TableMetaBuilder::empty()
1623 .schema(schema)
1624 .primary_key_indices(vec![0])
1625 .engine("engine")
1626 .next_column_id(3)
1627 .build()
1628 .unwrap();
1629
1630 let alter_kind = AlterKind::AddColumns {
1632 columns: vec![AddColumnRequest {
1633 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1634 is_key: false,
1635 location: None,
1636 add_if_not_exists: true,
1637 }],
1638 };
1639 let err = meta
1640 .builder_with_alter_kind("my_table", &alter_kind)
1641 .err()
1642 .unwrap();
1643 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1644 }
1645
1646 #[test]
1647 fn test_add_invalid_column() {
1648 let schema = Arc::new(new_test_schema());
1649 let meta = TableMetaBuilder::empty()
1650 .schema(schema)
1651 .primary_key_indices(vec![0])
1652 .engine("engine")
1653 .next_column_id(3)
1654 .build()
1655 .unwrap();
1656
1657 let alter_kind = AlterKind::AddColumns {
1659 columns: vec![AddColumnRequest {
1660 column_schema: ColumnSchema::new(
1661 "weny",
1662 ConcreteDataType::string_datatype(),
1663 false,
1664 ),
1665 is_key: false,
1666 location: None,
1667 add_if_not_exists: false,
1668 }],
1669 };
1670
1671 let err = meta
1672 .builder_with_alter_kind("my_table", &alter_kind)
1673 .err()
1674 .unwrap();
1675 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1676 }
1677
1678 #[test]
1679 fn test_remove_unknown_column() {
1680 let schema = Arc::new(new_test_schema());
1681 let meta = TableMetaBuilder::empty()
1682 .schema(schema)
1683 .primary_key_indices(vec![0])
1684 .engine("engine")
1685 .next_column_id(3)
1686 .build()
1687 .unwrap();
1688
1689 let alter_kind = AlterKind::DropColumns {
1690 names: vec![String::from("unknown")],
1691 };
1692
1693 let err = meta
1694 .builder_with_alter_kind("my_table", &alter_kind)
1695 .err()
1696 .unwrap();
1697 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1698 }
1699
1700 #[test]
1701 fn test_change_unknown_column_data_type() {
1702 let schema = Arc::new(new_test_schema());
1703 let meta = TableMetaBuilder::empty()
1704 .schema(schema)
1705 .primary_key_indices(vec![0])
1706 .engine("engine")
1707 .next_column_id(3)
1708 .build()
1709 .unwrap();
1710
1711 let alter_kind = AlterKind::ModifyColumnTypes {
1712 columns: vec![ModifyColumnTypeRequest {
1713 column_name: "unknown".to_string(),
1714 target_type: ConcreteDataType::string_datatype(),
1715 }],
1716 };
1717
1718 let err = meta
1719 .builder_with_alter_kind("my_table", &alter_kind)
1720 .err()
1721 .unwrap();
1722 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1723 }
1724
1725 #[test]
1726 fn test_remove_key_column() {
1727 let schema = Arc::new(new_test_schema());
1728 let meta = TableMetaBuilder::empty()
1729 .schema(schema)
1730 .primary_key_indices(vec![0])
1731 .engine("engine")
1732 .next_column_id(3)
1733 .build()
1734 .unwrap();
1735
1736 let alter_kind = AlterKind::DropColumns {
1738 names: vec![String::from("col1")],
1739 };
1740
1741 let err = meta
1742 .builder_with_alter_kind("my_table", &alter_kind)
1743 .err()
1744 .unwrap();
1745 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1746
1747 let alter_kind = AlterKind::DropColumns {
1749 names: vec![String::from("ts")],
1750 };
1751
1752 let err = meta
1753 .builder_with_alter_kind("my_table", &alter_kind)
1754 .err()
1755 .unwrap();
1756 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1757 }
1758
1759 #[test]
1760 fn test_change_key_column_data_type() {
1761 let schema = Arc::new(new_test_schema());
1762 let meta = TableMetaBuilder::empty()
1763 .schema(schema)
1764 .primary_key_indices(vec![0])
1765 .engine("engine")
1766 .next_column_id(3)
1767 .build()
1768 .unwrap();
1769
1770 let alter_kind = AlterKind::ModifyColumnTypes {
1772 columns: vec![ModifyColumnTypeRequest {
1773 column_name: "col1".to_string(),
1774 target_type: ConcreteDataType::string_datatype(),
1775 }],
1776 };
1777
1778 let err = meta
1779 .builder_with_alter_kind("my_table", &alter_kind)
1780 .err()
1781 .unwrap();
1782 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1783
1784 let alter_kind = AlterKind::ModifyColumnTypes {
1786 columns: vec![ModifyColumnTypeRequest {
1787 column_name: "ts".to_string(),
1788 target_type: ConcreteDataType::string_datatype(),
1789 }],
1790 };
1791
1792 let err = meta
1793 .builder_with_alter_kind("my_table", &alter_kind)
1794 .err()
1795 .unwrap();
1796 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1797 }
1798
1799 #[test]
1800 fn test_alloc_new_column() {
1801 let schema = Arc::new(new_test_schema());
1802 let mut meta = TableMetaBuilder::empty()
1803 .schema(schema)
1804 .primary_key_indices(vec![0])
1805 .engine("engine")
1806 .next_column_id(3)
1807 .build()
1808 .unwrap();
1809 assert_eq!(3, meta.next_column_id);
1810
1811 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1812 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1813
1814 assert_eq!(4, meta.next_column_id);
1815 assert_eq!(column_schema.name, desc.name);
1816 }
1817
1818 #[test]
1819 fn test_add_columns_with_location() {
1820 let schema = Arc::new(new_test_schema());
1821 let meta = TableMetaBuilder::empty()
1822 .schema(schema)
1823 .primary_key_indices(vec![0])
1824 .engine("engine")
1825 .next_column_id(3)
1826 .build()
1827 .unwrap();
1828
1829 let new_meta = add_columns_to_meta_with_location(&meta);
1830 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1831
1832 let names: Vec<String> = new_meta
1833 .schema
1834 .column_schemas()
1835 .iter()
1836 .map(|column_schema| column_schema.name.clone())
1837 .collect();
1838 assert_eq!(
1839 &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
1840 &names[..]
1841 );
1842 assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
1843 assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
1844 }
1845
1846 #[test]
1847 fn test_modify_column_fulltext_options() {
1848 let schema = Arc::new(new_test_schema());
1849 let meta = TableMetaBuilder::empty()
1850 .schema(schema)
1851 .primary_key_indices(vec![0])
1852 .engine("engine")
1853 .next_column_id(3)
1854 .build()
1855 .unwrap();
1856
1857 let alter_kind = AlterKind::SetIndex {
1858 options: SetIndexOptions::Fulltext {
1859 column_name: "col1".to_string(),
1860 options: FulltextOptions::default(),
1861 },
1862 };
1863 let err = meta
1864 .builder_with_alter_kind("my_table", &alter_kind)
1865 .err()
1866 .unwrap();
1867 assert_eq!(
1868 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1869 err.to_string()
1870 );
1871
1872 let new_meta = add_columns_to_meta_with_location(&meta);
1874 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1875
1876 let alter_kind = AlterKind::SetIndex {
1877 options: SetIndexOptions::Fulltext {
1878 column_name: "my_tag_first".to_string(),
1879 options: FulltextOptions {
1880 enable: true,
1881 analyzer: FulltextAnalyzer::Chinese,
1882 case_sensitive: true,
1883 backend: FulltextBackend::Bloom,
1884 },
1885 },
1886 };
1887 let new_meta = new_meta
1888 .builder_with_alter_kind("my_table", &alter_kind)
1889 .unwrap()
1890 .build()
1891 .unwrap();
1892 let column_schema = new_meta
1893 .schema
1894 .column_schema_by_name("my_tag_first")
1895 .unwrap();
1896 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1897 assert!(fulltext_options.enable);
1898 assert_eq!(
1899 datatypes::schema::FulltextAnalyzer::Chinese,
1900 fulltext_options.analyzer
1901 );
1902 assert!(fulltext_options.case_sensitive);
1903
1904 let alter_kind = AlterKind::UnsetIndex {
1905 options: UnsetIndexOptions::Fulltext {
1906 column_name: "my_tag_first".to_string(),
1907 },
1908 };
1909 let new_meta = new_meta
1910 .builder_with_alter_kind("my_table", &alter_kind)
1911 .unwrap()
1912 .build()
1913 .unwrap();
1914 let column_schema = new_meta
1915 .schema
1916 .column_schema_by_name("my_tag_first")
1917 .unwrap();
1918 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1919 assert!(!fulltext_options.enable);
1920 }
1921}