1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39 UnsetIndexOptions,
40};
41
42pub type TableId = u32;
43pub type TableVersion = u64;
44
45#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
48pub enum FilterPushDownType {
49 Unsupported,
51 Inexact,
56 Exact,
60}
61
62impl From<TableProviderFilterPushDown> for FilterPushDownType {
63 fn from(value: TableProviderFilterPushDown) -> Self {
64 match value {
65 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
66 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
67 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
68 }
69 }
70}
71
72impl From<FilterPushDownType> for TableProviderFilterPushDown {
73 fn from(value: FilterPushDownType) -> Self {
74 match value {
75 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
76 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
77 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
78 }
79 }
80}
81
82#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
84pub enum TableType {
85 Base,
87 View,
89 Temporary,
91}
92
93impl std::fmt::Display for TableType {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 TableType::Base => f.write_str("BASE TABLE"),
97 TableType::Temporary => f.write_str("TEMPORARY"),
98 TableType::View => f.write_str("VIEW"),
99 }
100 }
101}
102
103#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
105pub struct TableIdent {
106 pub table_id: TableId,
108 pub version: TableVersion,
111}
112
113#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
117#[builder(pattern = "mutable", custom_constructor)]
118pub struct TableMeta {
119 pub schema: SchemaRef,
120 pub primary_key_indices: Vec<usize>,
123 #[builder(default = "self.default_value_indices()?")]
124 pub value_indices: Vec<usize>,
125 #[builder(default, setter(into))]
126 pub engine: String,
127 #[builder(default, setter(into))]
128 pub region_numbers: Vec<u32>,
129 pub next_column_id: ColumnId,
130 #[builder(default)]
132 pub options: TableOptions,
133 #[builder(default = "Utc::now()")]
134 pub created_on: DateTime<Utc>,
135 #[builder(default = "Vec::new()")]
136 pub partition_key_indices: Vec<usize>,
137}
138
139impl TableMetaBuilder {
140 #[cfg(any(test, feature = "testing"))]
142 pub fn empty() -> Self {
143 Self {
144 schema: None,
145 primary_key_indices: None,
146 value_indices: None,
147 engine: None,
148 region_numbers: None,
149 next_column_id: None,
150 options: None,
151 created_on: None,
152 partition_key_indices: None,
153 }
154 }
155}
156
157impl TableMetaBuilder {
158 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
159 match (&self.primary_key_indices, &self.schema) {
160 (Some(v), Some(schema)) => {
161 let column_schemas = schema.column_schemas();
162 Ok((0..column_schemas.len())
163 .filter(|idx| !v.contains(idx))
164 .collect())
165 }
166 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
167 }
168 }
169
170 pub fn new_external_table() -> Self {
171 Self {
172 schema: None,
173 primary_key_indices: Some(Vec::new()),
174 value_indices: Some(Vec::new()),
175 engine: None,
176 region_numbers: Some(Vec::new()),
177 next_column_id: Some(0),
178 options: None,
179 created_on: None,
180 partition_key_indices: None,
181 }
182 }
183}
184
185struct SplitResult<'a> {
187 columns_at_first: Vec<&'a AddColumnRequest>,
189 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
191 columns_at_last: Vec<&'a AddColumnRequest>,
193 column_names: Vec<String>,
195}
196
197impl TableMeta {
198 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
199 let columns_schemas = &self.schema.column_schemas();
200 self.primary_key_indices
201 .iter()
202 .map(|idx| &columns_schemas[*idx].name)
203 }
204
205 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
206 let columns_schemas = self.schema.column_schemas();
208 let primary_key_indices = &self.primary_key_indices;
209 columns_schemas
210 .iter()
211 .enumerate()
212 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
213 .map(|(_, cs)| &cs.name)
214 }
215
216 pub fn builder_with_alter_kind(
220 &self,
221 table_name: &str,
222 alter_kind: &AlterKind,
223 ) -> Result<TableMetaBuilder> {
224 match alter_kind {
225 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
226 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
227 AlterKind::ModifyColumnTypes { columns } => {
228 self.modify_column_types(table_name, columns)
229 }
230 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
232 AlterKind::SetTableOptions { options } => self.set_table_options(options),
233 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
234 AlterKind::SetIndex { options } => match options {
235 SetIndexOptions::Fulltext {
236 column_name,
237 options,
238 } => self.change_column_fulltext_options(
239 table_name,
240 column_name,
241 true,
242 Some(options),
243 ),
244 SetIndexOptions::Inverted { column_name } => {
245 self.change_column_modify_inverted_index(table_name, column_name, true)
246 }
247 SetIndexOptions::Skipping {
248 column_name,
249 options,
250 } => self.change_column_skipping_index_options(
251 table_name,
252 column_name,
253 Some(options),
254 ),
255 },
256 AlterKind::UnsetIndex { options } => match options {
257 UnsetIndexOptions::Fulltext { column_name } => {
258 self.change_column_fulltext_options(table_name, column_name, false, None)
259 }
260 UnsetIndexOptions::Inverted { column_name } => {
261 self.change_column_modify_inverted_index(table_name, column_name, false)
262 }
263 UnsetIndexOptions::Skipping { column_name } => {
264 self.change_column_skipping_index_options(table_name, column_name, None)
265 }
266 },
267 }
268 }
269
270 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
272 let mut new_options = self.options.clone();
273
274 for request in requests {
275 match request {
276 SetRegionOption::Ttl(new_ttl) => {
277 new_options.ttl = *new_ttl;
278 }
279 SetRegionOption::Twsc(key, value) => {
280 if !value.is_empty() {
281 new_options
282 .extra_options
283 .insert(key.to_string(), value.to_string());
284 new_options.extra_options.insert(
286 COMPACTION_TYPE.to_string(),
287 COMPACTION_TYPE_TWCS.to_string(),
288 );
289 } else {
290 new_options.extra_options.remove(key.as_str());
292 }
293 }
294 }
295 }
296 let mut builder = self.new_meta_builder();
297 builder.options(new_options);
298
299 Ok(builder)
300 }
301
302 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
303 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
304 self.set_table_options(&requests)
305 }
306
307 fn change_column_modify_inverted_index(
309 &self,
310 table_name: &str,
311 column_name: &str,
312 value: bool,
313 ) -> Result<TableMetaBuilder> {
314 let table_schema = &self.schema;
315 let mut meta_builder = self.new_meta_builder();
316
317 let mut columns: Vec<ColumnSchema> =
318 Vec::with_capacity(table_schema.column_schemas().len());
319
320 for column_schema in table_schema.column_schemas().iter() {
321 if column_schema.name == column_name {
322 let mut new_column_schema = column_schema.clone();
323 new_column_schema.set_inverted_index(value);
324 columns.push(new_column_schema);
325 } else {
326 columns.push(column_schema.clone());
327 }
328 }
329
330 let mut builder = SchemaBuilder::try_from_columns(columns)
332 .with_context(|_| error::SchemaBuildSnafu {
333 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
334 })?
335 .version(table_schema.version() + 1);
336
337 for (k, v) in table_schema.metadata().iter() {
338 builder = builder.add_metadata(k, v);
339 }
340
341 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
342 msg: format!(
343 "Table {table_name} cannot change fulltext options for column {column_name}",
344 ),
345 })?;
346
347 let _ = meta_builder
348 .schema(Arc::new(new_schema))
349 .primary_key_indices(self.primary_key_indices.clone());
350
351 Ok(meta_builder)
352 }
353
354 fn change_column_fulltext_options(
356 &self,
357 table_name: &str,
358 column_name: &str,
359 enable: bool,
360 options: Option<&FulltextOptions>,
361 ) -> Result<TableMetaBuilder> {
362 let table_schema = &self.schema;
363 let mut meta_builder = self.new_meta_builder();
364
365 let column = &table_schema
366 .column_schema_by_name(column_name)
367 .with_context(|| error::ColumnNotExistsSnafu {
368 column_name,
369 table_name,
370 })?;
371
372 ensure!(
373 column.data_type.is_string(),
374 error::InvalidColumnOptionSnafu {
375 column_name,
376 msg: "FULLTEXT index only supports string type",
377 }
378 );
379
380 let current_fulltext_options = column
381 .fulltext_options()
382 .context(error::SetFulltextOptionsSnafu { column_name })?;
383
384 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
385 for column_schema in table_schema.column_schemas() {
386 if column_schema.name == column_name {
387 let mut new_column_schema = column_schema.clone();
388 if enable {
389 ensure!(
390 options.is_some(),
391 error::InvalidColumnOptionSnafu {
392 column_name,
393 msg: "FULLTEXT index options must be provided",
394 }
395 );
396 set_column_fulltext_options(
397 &mut new_column_schema,
398 column_name,
399 options.unwrap(),
400 current_fulltext_options.clone(),
401 )?
402 } else {
403 unset_column_fulltext_options(
404 &mut new_column_schema,
405 column_name,
406 current_fulltext_options.clone(),
407 )?
408 }
409 columns.push(new_column_schema);
410 } else {
411 columns.push(column_schema.clone());
412 }
413 }
414
415 let mut builder = SchemaBuilder::try_from_columns(columns)
417 .with_context(|_| error::SchemaBuildSnafu {
418 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
419 })?
420 .version(table_schema.version() + 1);
421
422 for (k, v) in table_schema.metadata().iter() {
423 builder = builder.add_metadata(k, v);
424 }
425
426 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
427 msg: format!(
428 "Table {table_name} cannot change fulltext options for column {column_name}",
429 ),
430 })?;
431
432 let _ = meta_builder
433 .schema(Arc::new(new_schema))
434 .primary_key_indices(self.primary_key_indices.clone());
435
436 Ok(meta_builder)
437 }
438
439 fn change_column_skipping_index_options(
441 &self,
442 table_name: &str,
443 column_name: &str,
444 options: Option<&SkippingIndexOptions>,
445 ) -> Result<TableMetaBuilder> {
446 let table_schema = &self.schema;
447 let mut meta_builder = self.new_meta_builder();
448
449 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
450 for column_schema in table_schema.column_schemas() {
451 if column_schema.name == column_name {
452 let mut new_column_schema = column_schema.clone();
453 if let Some(options) = options {
454 set_column_skipping_index_options(
455 &mut new_column_schema,
456 column_name,
457 options,
458 )?;
459 } else {
460 unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
461 }
462 columns.push(new_column_schema);
463 } else {
464 columns.push(column_schema.clone());
465 }
466 }
467
468 let mut builder = SchemaBuilder::try_from_columns(columns)
469 .with_context(|_| error::SchemaBuildSnafu {
470 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
471 })?
472 .version(table_schema.version() + 1);
473
474 for (k, v) in table_schema.metadata().iter() {
475 builder = builder.add_metadata(k, v);
476 }
477
478 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
479 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
480 })?;
481
482 let _ = meta_builder
483 .schema(Arc::new(new_schema))
484 .primary_key_indices(self.primary_key_indices.clone());
485
486 Ok(meta_builder)
487 }
488
489 pub fn alloc_new_column(
494 &mut self,
495 table_name: &str,
496 new_column: &ColumnSchema,
497 ) -> Result<ColumnDescriptor> {
498 let desc = ColumnDescriptorBuilder::new(
499 self.next_column_id as ColumnId,
500 &new_column.name,
501 new_column.data_type.clone(),
502 )
503 .is_nullable(new_column.is_nullable())
504 .default_constraint(new_column.default_constraint().cloned())
505 .build()
506 .context(error::BuildColumnDescriptorSnafu {
507 table_name,
508 column_name: &new_column.name,
509 })?;
510
511 self.next_column_id += 1;
513
514 Ok(desc)
515 }
516
517 fn new_meta_builder(&self) -> TableMetaBuilder {
519 let mut builder = TableMetaBuilder::from(self);
520 builder.value_indices = None;
522 builder
523 }
524
525 fn add_columns(
527 &self,
528 table_name: &str,
529 requests: &[AddColumnRequest],
530 ) -> Result<TableMetaBuilder> {
531 let table_schema = &self.schema;
532 let mut meta_builder = self.new_meta_builder();
533 let original_primary_key_indices: HashSet<&usize> =
534 self.primary_key_indices.iter().collect();
535
536 let mut names = HashSet::with_capacity(requests.len());
537 let mut new_columns = Vec::with_capacity(requests.len());
538 for col_to_add in requests {
539 if let Some(column_schema) =
540 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
541 {
542 ensure!(
544 col_to_add.add_if_not_exists,
545 error::ColumnExistsSnafu {
546 table_name,
547 column_name: &col_to_add.column_schema.name
548 },
549 );
550
551 ensure!(
553 column_schema.data_type == col_to_add.column_schema.data_type,
554 error::InvalidAlterRequestSnafu {
555 table: table_name,
556 err: format!(
557 "column {} already exists with different type {:?}",
558 col_to_add.column_schema.name, column_schema.data_type,
559 ),
560 }
561 );
562 } else {
563 ensure!(
566 names.insert(&col_to_add.column_schema.name),
567 error::InvalidAlterRequestSnafu {
568 table: table_name,
569 err: format!(
570 "add column {} more than once",
571 col_to_add.column_schema.name
572 ),
573 }
574 );
575
576 ensure!(
577 col_to_add.column_schema.is_nullable()
578 || col_to_add.column_schema.default_constraint().is_some(),
579 error::InvalidAlterRequestSnafu {
580 table: table_name,
581 err: format!(
582 "no default value for column {}",
583 col_to_add.column_schema.name
584 ),
585 },
586 );
587
588 new_columns.push(col_to_add.clone());
589 }
590 }
591 let requests = &new_columns[..];
592
593 let SplitResult {
594 columns_at_first,
595 columns_at_after,
596 columns_at_last,
597 column_names,
598 } = self.split_requests_by_column_location(table_name, requests)?;
599 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
600 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
601 columns_at_first.iter().rev().for_each(|request| {
603 if request.is_key {
604 primary_key_indices.push(columns.len());
606 }
607 columns.push(request.column_schema.clone());
608 });
609 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
611 if original_primary_key_indices.contains(&index) {
612 primary_key_indices.push(columns.len());
613 }
614 columns.push(column_schema.clone());
615 if let Some(requests) = columns_at_after.get(&column_schema.name) {
616 requests.iter().rev().for_each(|request| {
617 if request.is_key {
618 primary_key_indices.push(columns.len());
620 }
621 columns.push(request.column_schema.clone());
622 });
623 }
624 }
625 columns_at_last.iter().for_each(|request| {
627 if request.is_key {
628 primary_key_indices.push(columns.len());
630 }
631 columns.push(request.column_schema.clone());
632 });
633
634 let mut builder = SchemaBuilder::try_from(columns)
635 .with_context(|_| error::SchemaBuildSnafu {
636 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
637 })?
638 .version(table_schema.version() + 1);
640 for (k, v) in table_schema.metadata().iter() {
641 builder = builder.add_metadata(k, v);
642 }
643 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
644 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
645 })?;
646
647 let _ = meta_builder
649 .schema(Arc::new(new_schema))
650 .primary_key_indices(primary_key_indices);
651
652 Ok(meta_builder)
653 }
654
655 fn remove_columns(
656 &self,
657 table_name: &str,
658 column_names: &[String],
659 ) -> Result<TableMetaBuilder> {
660 let table_schema = &self.schema;
661 let column_names: HashSet<_> = column_names.iter().collect();
662 let mut meta_builder = self.new_meta_builder();
663
664 let timestamp_index = table_schema.timestamp_index();
665 for column_name in &column_names {
667 if let Some(index) = table_schema.column_index_by_name(column_name) {
668 ensure!(
671 !self.primary_key_indices.contains(&index),
672 error::RemoveColumnInIndexSnafu {
673 column_name: *column_name,
674 table_name,
675 }
676 );
677
678 if let Some(ts_index) = timestamp_index {
679 ensure!(
681 index != ts_index,
682 error::RemoveColumnInIndexSnafu {
683 column_name: table_schema.column_name_by_index(ts_index),
684 table_name,
685 }
686 );
687 }
688 } else {
689 return error::ColumnNotExistsSnafu {
690 column_name: *column_name,
691 table_name,
692 }
693 .fail()?;
694 }
695 }
696
697 let columns: Vec<_> = table_schema
699 .column_schemas()
700 .iter()
701 .filter(|column_schema| !column_names.contains(&column_schema.name))
702 .cloned()
703 .collect();
704
705 let mut builder = SchemaBuilder::try_from_columns(columns)
706 .with_context(|_| error::SchemaBuildSnafu {
707 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
708 })?
709 .version(table_schema.version() + 1);
711 for (k, v) in table_schema.metadata().iter() {
712 builder = builder.add_metadata(k, v);
713 }
714 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
715 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
716 })?;
717
718 let primary_key_indices = self
720 .primary_key_indices
721 .iter()
722 .map(|idx| table_schema.column_name_by_index(*idx))
723 .map(|name| new_schema.column_index_by_name(name).unwrap())
725 .collect();
726
727 let _ = meta_builder
728 .schema(Arc::new(new_schema))
729 .primary_key_indices(primary_key_indices);
730
731 Ok(meta_builder)
732 }
733
734 fn modify_column_types(
735 &self,
736 table_name: &str,
737 requests: &[ModifyColumnTypeRequest],
738 ) -> Result<TableMetaBuilder> {
739 let table_schema = &self.schema;
740 let mut meta_builder = self.new_meta_builder();
741
742 let mut modify_column_types = HashMap::with_capacity(requests.len());
743 let timestamp_index = table_schema.timestamp_index();
744
745 for col_to_change in requests {
746 let change_column_name = &col_to_change.column_name;
747
748 let index = table_schema
749 .column_index_by_name(change_column_name)
750 .with_context(|| error::ColumnNotExistsSnafu {
751 column_name: change_column_name,
752 table_name,
753 })?;
754
755 let column = &table_schema.column_schemas()[index];
756
757 ensure!(
758 !self.primary_key_indices.contains(&index),
759 error::InvalidAlterRequestSnafu {
760 table: table_name,
761 err: format!(
762 "Not allowed to change primary key index column '{}'",
763 column.name
764 )
765 }
766 );
767
768 if let Some(ts_index) = timestamp_index {
769 ensure!(
771 index != ts_index,
772 error::InvalidAlterRequestSnafu {
773 table: table_name,
774 err: format!(
775 "Not allowed to change timestamp index column '{}' datatype",
776 column.name
777 )
778 }
779 );
780 }
781
782 ensure!(
783 modify_column_types
784 .insert(&col_to_change.column_name, col_to_change)
785 .is_none(),
786 error::InvalidAlterRequestSnafu {
787 table: table_name,
788 err: format!(
789 "change column datatype {} more than once",
790 col_to_change.column_name
791 ),
792 }
793 );
794
795 ensure!(
796 column
797 .data_type
798 .can_arrow_type_cast_to(&col_to_change.target_type),
799 error::InvalidAlterRequestSnafu {
800 table: table_name,
801 err: format!(
802 "column '{}' cannot be cast automatically to type '{}'",
803 col_to_change.column_name, col_to_change.target_type,
804 ),
805 }
806 );
807
808 ensure!(
809 column.is_nullable(),
810 error::InvalidAlterRequestSnafu {
811 table: table_name,
812 err: format!(
813 "column '{}' must be nullable to ensure safe conversion.",
814 col_to_change.column_name,
815 ),
816 }
817 );
818 }
819 let columns: Vec<_> = table_schema
821 .column_schemas()
822 .iter()
823 .cloned()
824 .map(|mut column| {
825 if let Some(change_column) = modify_column_types.get(&column.name) {
826 column.data_type = change_column.target_type.clone();
827 }
828 column
829 })
830 .collect();
831
832 let mut builder = SchemaBuilder::try_from_columns(columns)
833 .with_context(|_| error::SchemaBuildSnafu {
834 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
835 })?
836 .version(table_schema.version() + 1);
838 for (k, v) in table_schema.metadata().iter() {
839 builder = builder.add_metadata(k, v);
840 }
841 let new_schema = builder.build().with_context(|_| {
842 let column_names: Vec<_> = requests
843 .iter()
844 .map(|request| &request.column_name)
845 .collect();
846
847 error::SchemaBuildSnafu {
848 msg: format!(
849 "Table {table_name} cannot change datatype with columns {column_names:?}"
850 ),
851 }
852 })?;
853
854 let _ = meta_builder
855 .schema(Arc::new(new_schema))
856 .primary_key_indices(self.primary_key_indices.clone());
857
858 Ok(meta_builder)
859 }
860
861 fn split_requests_by_column_location<'a>(
863 &self,
864 table_name: &str,
865 requests: &'a [AddColumnRequest],
866 ) -> Result<SplitResult<'a>> {
867 let table_schema = &self.schema;
868 let mut columns_at_first = Vec::new();
869 let mut columns_at_after = HashMap::new();
870 let mut columns_at_last = Vec::new();
871 let mut column_names = Vec::with_capacity(requests.len());
872 for request in requests {
873 let column_name = &request.column_schema.name;
875 column_names.push(column_name.clone());
876 ensure!(
877 table_schema.column_schema_by_name(column_name).is_none(),
878 error::ColumnExistsSnafu {
879 column_name,
880 table_name,
881 }
882 );
883 match request.location.as_ref() {
884 Some(AddColumnLocation::First) => {
885 columns_at_first.push(request);
886 }
887 Some(AddColumnLocation::After { column_name }) => {
888 ensure!(
889 table_schema.column_schema_by_name(column_name).is_some(),
890 error::ColumnNotExistsSnafu {
891 column_name,
892 table_name,
893 }
894 );
895 columns_at_after
896 .entry(column_name.clone())
897 .or_insert(Vec::new())
898 .push(request);
899 }
900 None => {
901 columns_at_last.push(request);
902 }
903 }
904 }
905 Ok(SplitResult {
906 columns_at_first,
907 columns_at_after,
908 columns_at_last,
909 column_names,
910 })
911 }
912}
913
914#[derive(Clone, Debug, PartialEq, Eq, Builder)]
915#[builder(pattern = "owned")]
916pub struct TableInfo {
917 #[builder(default, setter(into))]
919 pub ident: TableIdent,
920 #[builder(setter(into))]
922 pub name: String,
923 #[builder(default, setter(into))]
925 pub desc: Option<String>,
926 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
927 pub catalog_name: String,
928 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
929 pub schema_name: String,
930 pub meta: TableMeta,
931 #[builder(default = "TableType::Base")]
932 pub table_type: TableType,
933}
934
935pub type TableInfoRef = Arc<TableInfo>;
936
937impl TableInfo {
938 pub fn table_id(&self) -> TableId {
939 self.ident.table_id
940 }
941
942 pub fn region_ids(&self) -> Vec<RegionId> {
943 self.meta
944 .region_numbers
945 .iter()
946 .map(|id| RegionId::new(self.table_id(), *id))
947 .collect()
948 }
949 pub fn full_table_name(&self) -> String {
951 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
952 }
953
954 pub fn is_physical_table(&self) -> bool {
956 self.meta
957 .options
958 .extra_options
959 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
960 }
961
962 pub fn is_ttl_instant_table(&self) -> bool {
964 self.meta
965 .options
966 .ttl
967 .map(|t| t.is_instant())
968 .unwrap_or(false)
969 }
970}
971
972impl TableInfoBuilder {
973 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
974 Self {
975 name: Some(name.into()),
976 meta: Some(meta),
977 ..Default::default()
978 }
979 }
980
981 pub fn table_id(mut self, id: TableId) -> Self {
982 let ident = self.ident.get_or_insert_with(TableIdent::default);
983 ident.table_id = id;
984 self
985 }
986
987 pub fn table_version(mut self, version: TableVersion) -> Self {
988 let ident = self.ident.get_or_insert_with(TableIdent::default);
989 ident.version = version;
990 self
991 }
992}
993
994impl TableIdent {
995 pub fn new(table_id: TableId) -> Self {
996 Self {
997 table_id,
998 version: 0,
999 }
1000 }
1001}
1002
1003impl From<TableId> for TableIdent {
1004 fn from(table_id: TableId) -> Self {
1005 Self::new(table_id)
1006 }
1007}
1008
1009#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1011pub struct RawTableMeta {
1012 pub schema: RawSchema,
1013 pub primary_key_indices: Vec<usize>,
1016 pub value_indices: Vec<usize>,
1018 pub engine: String,
1020 pub next_column_id: ColumnId,
1023 pub region_numbers: Vec<u32>,
1024 pub options: TableOptions,
1025 pub created_on: DateTime<Utc>,
1026 #[serde(default)]
1028 pub partition_key_indices: Vec<usize>,
1029}
1030
1031impl From<TableMeta> for RawTableMeta {
1032 fn from(meta: TableMeta) -> RawTableMeta {
1033 RawTableMeta {
1034 schema: RawSchema::from(&*meta.schema),
1035 primary_key_indices: meta.primary_key_indices,
1036 value_indices: meta.value_indices,
1037 engine: meta.engine,
1038 next_column_id: meta.next_column_id,
1039 region_numbers: meta.region_numbers,
1040 options: meta.options,
1041 created_on: meta.created_on,
1042 partition_key_indices: meta.partition_key_indices,
1043 }
1044 }
1045}
1046
1047impl TryFrom<RawTableMeta> for TableMeta {
1048 type Error = ConvertError;
1049
1050 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1051 Ok(TableMeta {
1052 schema: Arc::new(Schema::try_from(raw.schema)?),
1053 primary_key_indices: raw.primary_key_indices,
1054 value_indices: raw.value_indices,
1055 engine: raw.engine,
1056 region_numbers: raw.region_numbers,
1057 next_column_id: raw.next_column_id,
1058 options: raw.options,
1059 created_on: raw.created_on,
1060 partition_key_indices: raw.partition_key_indices,
1061 })
1062 }
1063}
1064
1065#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1067pub struct RawTableInfo {
1068 pub ident: TableIdent,
1069 pub name: String,
1070 pub desc: Option<String>,
1071 pub catalog_name: String,
1072 pub schema_name: String,
1073 pub meta: RawTableMeta,
1074 pub table_type: TableType,
1075}
1076
1077impl RawTableInfo {
1078 pub fn sort_columns(&mut self) {
1080 let column_schemas = &self.meta.schema.column_schemas;
1081 let primary_keys = self
1082 .meta
1083 .primary_key_indices
1084 .iter()
1085 .map(|index| column_schemas[*index].name.clone())
1086 .collect::<HashSet<_>>();
1087
1088 self.meta
1089 .schema
1090 .column_schemas
1091 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1092
1093 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1095 let mut timestamp_index = None;
1096 let mut value_indices =
1097 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
1098 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1099 if primary_keys.contains(&column_schema.name) {
1100 primary_key_indices.push(index);
1101 } else if column_schema.is_time_index() {
1102 timestamp_index = Some(index);
1103 } else {
1104 value_indices.push(index);
1105 }
1106 }
1107
1108 self.meta.schema.timestamp_index = timestamp_index;
1110 self.meta.primary_key_indices = primary_key_indices;
1111 self.meta.value_indices = value_indices;
1112 }
1113
1114 pub fn to_region_options(&self) -> HashMap<String, String> {
1118 HashMap::from(&self.meta.options)
1119 }
1120}
1121
1122impl From<TableInfo> for RawTableInfo {
1123 fn from(info: TableInfo) -> RawTableInfo {
1124 RawTableInfo {
1125 ident: info.ident,
1126 name: info.name,
1127 desc: info.desc,
1128 catalog_name: info.catalog_name,
1129 schema_name: info.schema_name,
1130 meta: RawTableMeta::from(info.meta),
1131 table_type: info.table_type,
1132 }
1133 }
1134}
1135
1136impl TryFrom<RawTableInfo> for TableInfo {
1137 type Error = ConvertError;
1138
1139 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1140 Ok(TableInfo {
1141 ident: raw.ident,
1142 name: raw.name,
1143 desc: raw.desc,
1144 catalog_name: raw.catalog_name,
1145 schema_name: raw.schema_name,
1146 meta: TableMeta::try_from(raw.meta)?,
1147 table_type: raw.table_type,
1148 })
1149 }
1150}
1151
1152fn set_column_fulltext_options(
1161 column_schema: &mut ColumnSchema,
1162 column_name: &str,
1163 options: &FulltextOptions,
1164 current_options: Option<FulltextOptions>,
1165) -> Result<()> {
1166 if let Some(current_options) = current_options {
1167 ensure!(
1168 current_options.analyzer == options.analyzer
1169 && current_options.case_sensitive == options.case_sensitive,
1170 error::InvalidColumnOptionSnafu {
1171 column_name,
1172 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1173 current_options.analyzer, current_options.case_sensitive),
1174 }
1175 );
1176 }
1177
1178 column_schema
1179 .set_fulltext_options(options)
1180 .context(error::SetFulltextOptionsSnafu { column_name })?;
1181
1182 Ok(())
1183}
1184
1185fn unset_column_fulltext_options(
1186 column_schema: &mut ColumnSchema,
1187 column_name: &str,
1188 current_options: Option<FulltextOptions>,
1189) -> Result<()> {
1190 ensure!(
1191 current_options
1192 .as_ref()
1193 .is_some_and(|options| options.enable),
1194 error::InvalidColumnOptionSnafu {
1195 column_name,
1196 msg: "FULLTEXT index already disabled".to_string(),
1197 }
1198 );
1199
1200 let mut options = current_options.unwrap();
1201 options.enable = false;
1202 column_schema
1203 .set_fulltext_options(&options)
1204 .context(error::SetFulltextOptionsSnafu { column_name })?;
1205
1206 Ok(())
1207}
1208
1209fn set_column_skipping_index_options(
1210 column_schema: &mut ColumnSchema,
1211 column_name: &str,
1212 options: &SkippingIndexOptions,
1213) -> Result<()> {
1214 column_schema
1215 .set_skipping_options(options)
1216 .context(error::SetSkippingOptionsSnafu { column_name })?;
1217
1218 Ok(())
1219}
1220
1221fn unset_column_skipping_index_options(
1222 column_schema: &mut ColumnSchema,
1223 column_name: &str,
1224) -> Result<()> {
1225 column_schema
1226 .unset_skipping_options()
1227 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1228 Ok(())
1229}
1230
1231#[cfg(test)]
1232mod tests {
1233 use common_error::ext::ErrorExt;
1234 use common_error::status_code::StatusCode;
1235 use datatypes::data_type::ConcreteDataType;
1236 use datatypes::schema::{
1237 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1238 };
1239
1240 use super::*;
1241
1242 fn new_test_schema() -> Schema {
1244 let column_schemas = vec![
1245 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1246 ColumnSchema::new(
1247 "ts",
1248 ConcreteDataType::timestamp_millisecond_datatype(),
1249 false,
1250 )
1251 .with_time_index(true),
1252 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1253 ];
1254 SchemaBuilder::try_from(column_schemas)
1255 .unwrap()
1256 .version(123)
1257 .build()
1258 .unwrap()
1259 }
1260
1261 #[test]
1262 fn test_raw_convert() {
1263 let schema = Arc::new(new_test_schema());
1264 let meta = TableMetaBuilder::empty()
1265 .schema(schema)
1266 .primary_key_indices(vec![0])
1267 .engine("engine")
1268 .next_column_id(3)
1269 .build()
1270 .unwrap();
1271 let info = TableInfoBuilder::default()
1272 .table_id(10)
1273 .table_version(5)
1274 .name("mytable")
1275 .meta(meta)
1276 .build()
1277 .unwrap();
1278
1279 let raw = RawTableInfo::from(info.clone());
1280 let info_new = TableInfo::try_from(raw).unwrap();
1281
1282 assert_eq!(info, info_new);
1283 }
1284
1285 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1286 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1287 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1288 let alter_kind = AlterKind::AddColumns {
1289 columns: vec![
1290 AddColumnRequest {
1291 column_schema: new_tag,
1292 is_key: true,
1293 location: None,
1294 add_if_not_exists: false,
1295 },
1296 AddColumnRequest {
1297 column_schema: new_field,
1298 is_key: false,
1299 location: None,
1300 add_if_not_exists: false,
1301 },
1302 ],
1303 };
1304
1305 let builder = meta
1306 .builder_with_alter_kind("my_table", &alter_kind)
1307 .unwrap();
1308 builder.build().unwrap()
1309 }
1310
1311 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1312 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1313 let new_field = ColumnSchema::new(
1314 "my_field_after_ts",
1315 ConcreteDataType::string_datatype(),
1316 true,
1317 );
1318 let alter_kind = AlterKind::AddColumns {
1319 columns: vec![
1320 AddColumnRequest {
1321 column_schema: new_tag,
1322 is_key: true,
1323 location: Some(AddColumnLocation::First),
1324 add_if_not_exists: false,
1325 },
1326 AddColumnRequest {
1327 column_schema: new_field,
1328 is_key: false,
1329 location: Some(AddColumnLocation::After {
1330 column_name: "ts".to_string(),
1331 }),
1332 add_if_not_exists: false,
1333 },
1334 ],
1335 };
1336
1337 let builder = meta
1338 .builder_with_alter_kind("my_table", &alter_kind)
1339 .unwrap();
1340 builder.build().unwrap()
1341 }
1342
1343 #[test]
1344 fn test_add_columns() {
1345 let schema = Arc::new(new_test_schema());
1346 let meta = TableMetaBuilder::empty()
1347 .schema(schema)
1348 .primary_key_indices(vec![0])
1349 .engine("engine")
1350 .next_column_id(3)
1351 .build()
1352 .unwrap();
1353
1354 let new_meta = add_columns_to_meta(&meta);
1355 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1356
1357 let names: Vec<String> = new_meta
1358 .schema
1359 .column_schemas()
1360 .iter()
1361 .map(|column_schema| column_schema.name.clone())
1362 .collect();
1363 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1364 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1365 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1366 }
1367
1368 #[test]
1369 fn test_add_columns_multiple_times() {
1370 let schema = Arc::new(new_test_schema());
1371 let meta = TableMetaBuilder::empty()
1372 .schema(schema)
1373 .primary_key_indices(vec![0])
1374 .engine("engine")
1375 .next_column_id(3)
1376 .build()
1377 .unwrap();
1378
1379 let alter_kind = AlterKind::AddColumns {
1380 columns: vec![
1381 AddColumnRequest {
1382 column_schema: ColumnSchema::new(
1383 "col3",
1384 ConcreteDataType::int32_datatype(),
1385 true,
1386 ),
1387 is_key: true,
1388 location: None,
1389 add_if_not_exists: true,
1390 },
1391 AddColumnRequest {
1392 column_schema: ColumnSchema::new(
1393 "col3",
1394 ConcreteDataType::int32_datatype(),
1395 true,
1396 ),
1397 is_key: true,
1398 location: None,
1399 add_if_not_exists: true,
1400 },
1401 ],
1402 };
1403 let err = meta
1404 .builder_with_alter_kind("my_table", &alter_kind)
1405 .err()
1406 .unwrap();
1407 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1408 }
1409
1410 #[test]
1411 fn test_remove_columns() {
1412 let schema = Arc::new(new_test_schema());
1413 let meta = TableMetaBuilder::empty()
1414 .schema(schema.clone())
1415 .primary_key_indices(vec![0])
1416 .engine("engine")
1417 .next_column_id(3)
1418 .build()
1419 .unwrap();
1420 let meta = add_columns_to_meta(&meta);
1422
1423 let alter_kind = AlterKind::DropColumns {
1424 names: vec![String::from("col2"), String::from("my_field")],
1425 };
1426 let new_meta = meta
1427 .builder_with_alter_kind("my_table", &alter_kind)
1428 .unwrap()
1429 .build()
1430 .unwrap();
1431
1432 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1433
1434 let names: Vec<String> = new_meta
1435 .schema
1436 .column_schemas()
1437 .iter()
1438 .map(|column_schema| column_schema.name.clone())
1439 .collect();
1440 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1441 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1442 assert_eq!(&[1], &new_meta.value_indices[..]);
1443 assert_eq!(
1444 schema.timestamp_column(),
1445 new_meta.schema.timestamp_column()
1446 );
1447 }
1448
1449 #[test]
1450 fn test_remove_multiple_columns_before_timestamp() {
1451 let column_schemas = vec![
1452 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1453 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1454 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1455 ColumnSchema::new(
1456 "ts",
1457 ConcreteDataType::timestamp_millisecond_datatype(),
1458 false,
1459 )
1460 .with_time_index(true),
1461 ];
1462 let schema = Arc::new(
1463 SchemaBuilder::try_from(column_schemas)
1464 .unwrap()
1465 .version(123)
1466 .build()
1467 .unwrap(),
1468 );
1469 let meta = TableMetaBuilder::empty()
1470 .schema(schema.clone())
1471 .primary_key_indices(vec![1])
1472 .engine("engine")
1473 .next_column_id(4)
1474 .build()
1475 .unwrap();
1476
1477 let alter_kind = AlterKind::DropColumns {
1479 names: vec![String::from("col3"), String::from("col1")],
1480 };
1481 let new_meta = meta
1482 .builder_with_alter_kind("my_table", &alter_kind)
1483 .unwrap()
1484 .build()
1485 .unwrap();
1486
1487 let names: Vec<String> = new_meta
1488 .schema
1489 .column_schemas()
1490 .iter()
1491 .map(|column_schema| column_schema.name.clone())
1492 .collect();
1493 assert_eq!(&["col2", "ts"], &names[..]);
1494 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1495 assert_eq!(&[1], &new_meta.value_indices[..]);
1496 assert_eq!(
1497 schema.timestamp_column(),
1498 new_meta.schema.timestamp_column()
1499 );
1500 }
1501
1502 #[test]
1503 fn test_add_existing_column() {
1504 let schema = Arc::new(new_test_schema());
1505 let meta = TableMetaBuilder::empty()
1506 .schema(schema)
1507 .primary_key_indices(vec![0])
1508 .engine("engine")
1509 .next_column_id(3)
1510 .build()
1511 .unwrap();
1512
1513 let alter_kind = AlterKind::AddColumns {
1514 columns: vec![AddColumnRequest {
1515 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1516 is_key: false,
1517 location: None,
1518 add_if_not_exists: false,
1519 }],
1520 };
1521
1522 let err = meta
1523 .builder_with_alter_kind("my_table", &alter_kind)
1524 .err()
1525 .unwrap();
1526 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1527
1528 let alter_kind = AlterKind::AddColumns {
1530 columns: vec![AddColumnRequest {
1531 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1532 is_key: true,
1533 location: None,
1534 add_if_not_exists: true,
1535 }],
1536 };
1537 let new_meta = meta
1538 .builder_with_alter_kind("my_table", &alter_kind)
1539 .unwrap()
1540 .build()
1541 .unwrap();
1542 assert_eq!(
1543 meta.schema.column_schemas(),
1544 new_meta.schema.column_schemas()
1545 );
1546 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1547 }
1548
1549 #[test]
1550 fn test_add_different_type_column() {
1551 let schema = Arc::new(new_test_schema());
1552 let meta = TableMetaBuilder::empty()
1553 .schema(schema)
1554 .primary_key_indices(vec![0])
1555 .engine("engine")
1556 .next_column_id(3)
1557 .build()
1558 .unwrap();
1559
1560 let alter_kind = AlterKind::AddColumns {
1562 columns: vec![AddColumnRequest {
1563 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1564 is_key: false,
1565 location: None,
1566 add_if_not_exists: true,
1567 }],
1568 };
1569 let err = meta
1570 .builder_with_alter_kind("my_table", &alter_kind)
1571 .err()
1572 .unwrap();
1573 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1574 }
1575
1576 #[test]
1577 fn test_add_invalid_column() {
1578 let schema = Arc::new(new_test_schema());
1579 let meta = TableMetaBuilder::empty()
1580 .schema(schema)
1581 .primary_key_indices(vec![0])
1582 .engine("engine")
1583 .next_column_id(3)
1584 .build()
1585 .unwrap();
1586
1587 let alter_kind = AlterKind::AddColumns {
1589 columns: vec![AddColumnRequest {
1590 column_schema: ColumnSchema::new(
1591 "weny",
1592 ConcreteDataType::string_datatype(),
1593 false,
1594 ),
1595 is_key: false,
1596 location: None,
1597 add_if_not_exists: false,
1598 }],
1599 };
1600
1601 let err = meta
1602 .builder_with_alter_kind("my_table", &alter_kind)
1603 .err()
1604 .unwrap();
1605 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1606 }
1607
1608 #[test]
1609 fn test_remove_unknown_column() {
1610 let schema = Arc::new(new_test_schema());
1611 let meta = TableMetaBuilder::empty()
1612 .schema(schema)
1613 .primary_key_indices(vec![0])
1614 .engine("engine")
1615 .next_column_id(3)
1616 .build()
1617 .unwrap();
1618
1619 let alter_kind = AlterKind::DropColumns {
1620 names: vec![String::from("unknown")],
1621 };
1622
1623 let err = meta
1624 .builder_with_alter_kind("my_table", &alter_kind)
1625 .err()
1626 .unwrap();
1627 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1628 }
1629
1630 #[test]
1631 fn test_change_unknown_column_data_type() {
1632 let schema = Arc::new(new_test_schema());
1633 let meta = TableMetaBuilder::empty()
1634 .schema(schema)
1635 .primary_key_indices(vec![0])
1636 .engine("engine")
1637 .next_column_id(3)
1638 .build()
1639 .unwrap();
1640
1641 let alter_kind = AlterKind::ModifyColumnTypes {
1642 columns: vec![ModifyColumnTypeRequest {
1643 column_name: "unknown".to_string(),
1644 target_type: ConcreteDataType::string_datatype(),
1645 }],
1646 };
1647
1648 let err = meta
1649 .builder_with_alter_kind("my_table", &alter_kind)
1650 .err()
1651 .unwrap();
1652 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1653 }
1654
1655 #[test]
1656 fn test_remove_key_column() {
1657 let schema = Arc::new(new_test_schema());
1658 let meta = TableMetaBuilder::empty()
1659 .schema(schema)
1660 .primary_key_indices(vec![0])
1661 .engine("engine")
1662 .next_column_id(3)
1663 .build()
1664 .unwrap();
1665
1666 let alter_kind = AlterKind::DropColumns {
1668 names: vec![String::from("col1")],
1669 };
1670
1671 let err = meta
1672 .builder_with_alter_kind("my_table", &alter_kind)
1673 .err()
1674 .unwrap();
1675 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1676
1677 let alter_kind = AlterKind::DropColumns {
1679 names: vec![String::from("ts")],
1680 };
1681
1682 let err = meta
1683 .builder_with_alter_kind("my_table", &alter_kind)
1684 .err()
1685 .unwrap();
1686 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1687 }
1688
1689 #[test]
1690 fn test_change_key_column_data_type() {
1691 let schema = Arc::new(new_test_schema());
1692 let meta = TableMetaBuilder::empty()
1693 .schema(schema)
1694 .primary_key_indices(vec![0])
1695 .engine("engine")
1696 .next_column_id(3)
1697 .build()
1698 .unwrap();
1699
1700 let alter_kind = AlterKind::ModifyColumnTypes {
1702 columns: vec![ModifyColumnTypeRequest {
1703 column_name: "col1".to_string(),
1704 target_type: ConcreteDataType::string_datatype(),
1705 }],
1706 };
1707
1708 let err = meta
1709 .builder_with_alter_kind("my_table", &alter_kind)
1710 .err()
1711 .unwrap();
1712 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1713
1714 let alter_kind = AlterKind::ModifyColumnTypes {
1716 columns: vec![ModifyColumnTypeRequest {
1717 column_name: "ts".to_string(),
1718 target_type: ConcreteDataType::string_datatype(),
1719 }],
1720 };
1721
1722 let err = meta
1723 .builder_with_alter_kind("my_table", &alter_kind)
1724 .err()
1725 .unwrap();
1726 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1727 }
1728
1729 #[test]
1730 fn test_alloc_new_column() {
1731 let schema = Arc::new(new_test_schema());
1732 let mut meta = TableMetaBuilder::empty()
1733 .schema(schema)
1734 .primary_key_indices(vec![0])
1735 .engine("engine")
1736 .next_column_id(3)
1737 .build()
1738 .unwrap();
1739 assert_eq!(3, meta.next_column_id);
1740
1741 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1742 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1743
1744 assert_eq!(4, meta.next_column_id);
1745 assert_eq!(column_schema.name, desc.name);
1746 }
1747
1748 #[test]
1749 fn test_add_columns_with_location() {
1750 let schema = Arc::new(new_test_schema());
1751 let meta = TableMetaBuilder::empty()
1752 .schema(schema)
1753 .primary_key_indices(vec![0])
1754 .engine("engine")
1755 .next_column_id(3)
1756 .build()
1757 .unwrap();
1758
1759 let new_meta = add_columns_to_meta_with_location(&meta);
1760 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1761
1762 let names: Vec<String> = new_meta
1763 .schema
1764 .column_schemas()
1765 .iter()
1766 .map(|column_schema| column_schema.name.clone())
1767 .collect();
1768 assert_eq!(
1769 &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
1770 &names[..]
1771 );
1772 assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
1773 assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
1774 }
1775
1776 #[test]
1777 fn test_modify_column_fulltext_options() {
1778 let schema = Arc::new(new_test_schema());
1779 let meta = TableMetaBuilder::empty()
1780 .schema(schema)
1781 .primary_key_indices(vec![0])
1782 .engine("engine")
1783 .next_column_id(3)
1784 .build()
1785 .unwrap();
1786
1787 let alter_kind = AlterKind::SetIndex {
1788 options: SetIndexOptions::Fulltext {
1789 column_name: "col1".to_string(),
1790 options: FulltextOptions::default(),
1791 },
1792 };
1793 let err = meta
1794 .builder_with_alter_kind("my_table", &alter_kind)
1795 .err()
1796 .unwrap();
1797 assert_eq!(
1798 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1799 err.to_string()
1800 );
1801
1802 let new_meta = add_columns_to_meta_with_location(&meta);
1804 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1805
1806 let alter_kind = AlterKind::SetIndex {
1807 options: SetIndexOptions::Fulltext {
1808 column_name: "my_tag_first".to_string(),
1809 options: FulltextOptions {
1810 enable: true,
1811 analyzer: FulltextAnalyzer::Chinese,
1812 case_sensitive: true,
1813 backend: FulltextBackend::Bloom,
1814 },
1815 },
1816 };
1817 let new_meta = new_meta
1818 .builder_with_alter_kind("my_table", &alter_kind)
1819 .unwrap()
1820 .build()
1821 .unwrap();
1822 let column_schema = new_meta
1823 .schema
1824 .column_schema_by_name("my_tag_first")
1825 .unwrap();
1826 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1827 assert!(fulltext_options.enable);
1828 assert_eq!(
1829 datatypes::schema::FulltextAnalyzer::Chinese,
1830 fulltext_options.analyzer
1831 );
1832 assert!(fulltext_options.case_sensitive);
1833
1834 let alter_kind = AlterKind::UnsetIndex {
1835 options: UnsetIndexOptions::Fulltext {
1836 column_name: "my_tag_first".to_string(),
1837 },
1838 };
1839 let new_meta = new_meta
1840 .builder_with_alter_kind("my_table", &alter_kind)
1841 .unwrap()
1842 .build()
1843 .unwrap();
1844 let column_schema = new_meta
1845 .schema
1846 .column_schema_by_name("my_tag_first")
1847 .unwrap();
1848 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1849 assert!(!fulltext_options.enable);
1850 }
1851}