1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39 UnsetIndexOptions,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
106pub struct TableIdent {
107 pub table_id: TableId,
109 pub version: TableVersion,
112}
113
114#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
118#[builder(pattern = "mutable", custom_constructor)]
119pub struct TableMeta {
120 pub schema: SchemaRef,
121 pub primary_key_indices: Vec<usize>,
124 #[builder(default = "self.default_value_indices()?")]
125 pub value_indices: Vec<usize>,
126 #[builder(default, setter(into))]
127 pub engine: String,
128 #[builder(default, setter(into))]
129 pub region_numbers: Vec<u32>,
130 pub next_column_id: ColumnId,
131 #[builder(default)]
133 pub options: TableOptions,
134 #[builder(default = "Utc::now()")]
135 pub created_on: DateTime<Utc>,
136 #[builder(default = "Vec::new()")]
137 pub partition_key_indices: Vec<usize>,
138 #[builder(default = "Vec::new()")]
139 pub column_ids: Vec<ColumnId>,
140}
141
142impl TableMetaBuilder {
143 #[cfg(any(test, feature = "testing"))]
145 pub fn empty() -> Self {
146 Self {
147 schema: None,
148 primary_key_indices: None,
149 value_indices: None,
150 engine: None,
151 region_numbers: None,
152 next_column_id: None,
153 options: None,
154 created_on: None,
155 partition_key_indices: None,
156 column_ids: None,
157 }
158 }
159}
160
161impl TableMetaBuilder {
162 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
163 match (&self.primary_key_indices, &self.schema) {
164 (Some(v), Some(schema)) => {
165 let column_schemas = schema.column_schemas();
166 Ok((0..column_schemas.len())
167 .filter(|idx| !v.contains(idx))
168 .collect())
169 }
170 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
171 }
172 }
173
174 pub fn new_external_table() -> Self {
175 Self {
176 schema: None,
177 primary_key_indices: Some(Vec::new()),
178 value_indices: Some(Vec::new()),
179 engine: None,
180 region_numbers: Some(Vec::new()),
181 next_column_id: Some(0),
182 options: None,
183 created_on: None,
184 partition_key_indices: None,
185 column_ids: None,
186 }
187 }
188}
189
190struct SplitResult<'a> {
192 columns_at_first: Vec<&'a AddColumnRequest>,
194 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
196 columns_at_last: Vec<&'a AddColumnRequest>,
198 column_names: Vec<String>,
200}
201
202impl TableMeta {
203 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
204 let columns_schemas = &self.schema.column_schemas();
205 self.primary_key_indices
206 .iter()
207 .map(|idx| &columns_schemas[*idx].name)
208 }
209
210 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
211 let columns_schemas = self.schema.column_schemas();
213 let primary_key_indices = &self.primary_key_indices;
214 columns_schemas
215 .iter()
216 .enumerate()
217 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
218 .map(|(_, cs)| &cs.name)
219 }
220
221 pub fn builder_with_alter_kind(
225 &self,
226 table_name: &str,
227 alter_kind: &AlterKind,
228 ) -> Result<TableMetaBuilder> {
229 match alter_kind {
230 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
231 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
232 AlterKind::ModifyColumnTypes { columns } => {
233 self.modify_column_types(table_name, columns)
234 }
235 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
237 AlterKind::SetTableOptions { options } => self.set_table_options(options),
238 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
239 AlterKind::SetIndex { options } => match options {
240 SetIndexOptions::Fulltext {
241 column_name,
242 options,
243 } => self.change_column_fulltext_options(
244 table_name,
245 column_name,
246 true,
247 Some(options),
248 ),
249 SetIndexOptions::Inverted { column_name } => {
250 self.change_column_modify_inverted_index(table_name, column_name, true)
251 }
252 SetIndexOptions::Skipping {
253 column_name,
254 options,
255 } => self.change_column_skipping_index_options(
256 table_name,
257 column_name,
258 Some(options),
259 ),
260 },
261 AlterKind::UnsetIndex { options } => match options {
262 UnsetIndexOptions::Fulltext { column_name } => {
263 self.change_column_fulltext_options(table_name, column_name, false, None)
264 }
265 UnsetIndexOptions::Inverted { column_name } => {
266 self.change_column_modify_inverted_index(table_name, column_name, false)
267 }
268 UnsetIndexOptions::Skipping { column_name } => {
269 self.change_column_skipping_index_options(table_name, column_name, None)
270 }
271 },
272 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
273 }
274 }
275
276 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
278 let mut new_options = self.options.clone();
279
280 for request in requests {
281 match request {
282 SetRegionOption::Ttl(new_ttl) => {
283 new_options.ttl = *new_ttl;
284 }
285 SetRegionOption::Twsc(key, value) => {
286 if !value.is_empty() {
287 new_options
288 .extra_options
289 .insert(key.to_string(), value.to_string());
290 new_options.extra_options.insert(
292 COMPACTION_TYPE.to_string(),
293 COMPACTION_TYPE_TWCS.to_string(),
294 );
295 } else {
296 new_options.extra_options.remove(key.as_str());
298 }
299 }
300 }
301 }
302 let mut builder = self.new_meta_builder();
303 builder.options(new_options);
304
305 Ok(builder)
306 }
307
308 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
309 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
310 self.set_table_options(&requests)
311 }
312
313 fn change_column_modify_inverted_index(
315 &self,
316 table_name: &str,
317 column_name: &str,
318 value: bool,
319 ) -> Result<TableMetaBuilder> {
320 let table_schema = &self.schema;
321 let mut meta_builder = self.new_meta_builder();
322
323 let mut columns: Vec<ColumnSchema> =
324 Vec::with_capacity(table_schema.column_schemas().len());
325
326 for column_schema in table_schema.column_schemas().iter() {
327 if column_schema.name == column_name {
328 let mut new_column_schema = column_schema.clone();
329 new_column_schema.set_inverted_index(value);
330 columns.push(new_column_schema);
331 } else {
332 columns.push(column_schema.clone());
333 }
334 }
335
336 let mut builder = SchemaBuilder::try_from_columns(columns)
338 .with_context(|_| error::SchemaBuildSnafu {
339 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
340 })?
341 .version(table_schema.version() + 1);
342
343 for (k, v) in table_schema.metadata().iter() {
344 builder = builder.add_metadata(k, v);
345 }
346
347 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
348 msg: format!(
349 "Table {table_name} cannot change fulltext options for column {column_name}",
350 ),
351 })?;
352
353 let _ = meta_builder
354 .schema(Arc::new(new_schema))
355 .primary_key_indices(self.primary_key_indices.clone());
356
357 Ok(meta_builder)
358 }
359
360 fn change_column_fulltext_options(
362 &self,
363 table_name: &str,
364 column_name: &str,
365 enable: bool,
366 options: Option<&FulltextOptions>,
367 ) -> Result<TableMetaBuilder> {
368 let table_schema = &self.schema;
369 let mut meta_builder = self.new_meta_builder();
370
371 let column = &table_schema
372 .column_schema_by_name(column_name)
373 .with_context(|| error::ColumnNotExistsSnafu {
374 column_name,
375 table_name,
376 })?;
377
378 ensure!(
379 column.data_type.is_string(),
380 error::InvalidColumnOptionSnafu {
381 column_name,
382 msg: "FULLTEXT index only supports string type",
383 }
384 );
385
386 let current_fulltext_options = column
387 .fulltext_options()
388 .context(error::SetFulltextOptionsSnafu { column_name })?;
389
390 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
391 for column_schema in table_schema.column_schemas() {
392 if column_schema.name == column_name {
393 let mut new_column_schema = column_schema.clone();
394 if enable {
395 ensure!(
396 options.is_some(),
397 error::InvalidColumnOptionSnafu {
398 column_name,
399 msg: "FULLTEXT index options must be provided",
400 }
401 );
402 set_column_fulltext_options(
403 &mut new_column_schema,
404 column_name,
405 options.unwrap(),
406 current_fulltext_options.clone(),
407 )?
408 } else {
409 unset_column_fulltext_options(
410 &mut new_column_schema,
411 column_name,
412 current_fulltext_options.clone(),
413 )?
414 }
415 columns.push(new_column_schema);
416 } else {
417 columns.push(column_schema.clone());
418 }
419 }
420
421 let mut builder = SchemaBuilder::try_from_columns(columns)
423 .with_context(|_| error::SchemaBuildSnafu {
424 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
425 })?
426 .version(table_schema.version() + 1);
427
428 for (k, v) in table_schema.metadata().iter() {
429 builder = builder.add_metadata(k, v);
430 }
431
432 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
433 msg: format!(
434 "Table {table_name} cannot change fulltext options for column {column_name}",
435 ),
436 })?;
437
438 let _ = meta_builder
439 .schema(Arc::new(new_schema))
440 .primary_key_indices(self.primary_key_indices.clone());
441
442 Ok(meta_builder)
443 }
444
445 fn change_column_skipping_index_options(
447 &self,
448 table_name: &str,
449 column_name: &str,
450 options: Option<&SkippingIndexOptions>,
451 ) -> Result<TableMetaBuilder> {
452 let table_schema = &self.schema;
453 let mut meta_builder = self.new_meta_builder();
454
455 let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
456 for column_schema in table_schema.column_schemas() {
457 if column_schema.name == column_name {
458 let mut new_column_schema = column_schema.clone();
459 if let Some(options) = options {
460 set_column_skipping_index_options(
461 &mut new_column_schema,
462 column_name,
463 options,
464 )?;
465 } else {
466 unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
467 }
468 columns.push(new_column_schema);
469 } else {
470 columns.push(column_schema.clone());
471 }
472 }
473
474 let mut builder = SchemaBuilder::try_from_columns(columns)
475 .with_context(|_| error::SchemaBuildSnafu {
476 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
477 })?
478 .version(table_schema.version() + 1);
479
480 for (k, v) in table_schema.metadata().iter() {
481 builder = builder.add_metadata(k, v);
482 }
483
484 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
485 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
486 })?;
487
488 let _ = meta_builder
489 .schema(Arc::new(new_schema))
490 .primary_key_indices(self.primary_key_indices.clone());
491
492 Ok(meta_builder)
493 }
494
495 pub fn alloc_new_column(
500 &mut self,
501 table_name: &str,
502 new_column: &ColumnSchema,
503 ) -> Result<ColumnDescriptor> {
504 let desc = ColumnDescriptorBuilder::new(
505 self.next_column_id as ColumnId,
506 &new_column.name,
507 new_column.data_type.clone(),
508 )
509 .is_nullable(new_column.is_nullable())
510 .default_constraint(new_column.default_constraint().cloned())
511 .build()
512 .context(error::BuildColumnDescriptorSnafu {
513 table_name,
514 column_name: &new_column.name,
515 })?;
516
517 self.next_column_id += 1;
519
520 Ok(desc)
521 }
522
523 fn new_meta_builder(&self) -> TableMetaBuilder {
525 let mut builder = TableMetaBuilder::from(self);
526 builder.value_indices = None;
528 builder
529 }
530
531 fn add_columns(
533 &self,
534 table_name: &str,
535 requests: &[AddColumnRequest],
536 ) -> Result<TableMetaBuilder> {
537 let table_schema = &self.schema;
538 let mut meta_builder = self.new_meta_builder();
539 let original_primary_key_indices: HashSet<&usize> =
540 self.primary_key_indices.iter().collect();
541
542 let mut names = HashSet::with_capacity(requests.len());
543 let mut new_columns = Vec::with_capacity(requests.len());
544 for col_to_add in requests {
545 if let Some(column_schema) =
546 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
547 {
548 ensure!(
550 col_to_add.add_if_not_exists,
551 error::ColumnExistsSnafu {
552 table_name,
553 column_name: &col_to_add.column_schema.name
554 },
555 );
556
557 ensure!(
559 column_schema.data_type == col_to_add.column_schema.data_type,
560 error::InvalidAlterRequestSnafu {
561 table: table_name,
562 err: format!(
563 "column {} already exists with different type {:?}",
564 col_to_add.column_schema.name, column_schema.data_type,
565 ),
566 }
567 );
568 } else {
569 ensure!(
572 names.insert(&col_to_add.column_schema.name),
573 error::InvalidAlterRequestSnafu {
574 table: table_name,
575 err: format!(
576 "add column {} more than once",
577 col_to_add.column_schema.name
578 ),
579 }
580 );
581
582 ensure!(
583 col_to_add.column_schema.is_nullable()
584 || col_to_add.column_schema.default_constraint().is_some(),
585 error::InvalidAlterRequestSnafu {
586 table: table_name,
587 err: format!(
588 "no default value for column {}",
589 col_to_add.column_schema.name
590 ),
591 },
592 );
593
594 new_columns.push(col_to_add.clone());
595 }
596 }
597 let requests = &new_columns[..];
598
599 let SplitResult {
600 columns_at_first,
601 columns_at_after,
602 columns_at_last,
603 column_names,
604 } = self.split_requests_by_column_location(table_name, requests)?;
605 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
606 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
607 columns_at_first.iter().rev().for_each(|request| {
609 if request.is_key {
610 primary_key_indices.push(columns.len());
612 }
613 columns.push(request.column_schema.clone());
614 });
615 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
617 if original_primary_key_indices.contains(&index) {
618 primary_key_indices.push(columns.len());
619 }
620 columns.push(column_schema.clone());
621 if let Some(requests) = columns_at_after.get(&column_schema.name) {
622 requests.iter().rev().for_each(|request| {
623 if request.is_key {
624 primary_key_indices.push(columns.len());
626 }
627 columns.push(request.column_schema.clone());
628 });
629 }
630 }
631 columns_at_last.iter().for_each(|request| {
633 if request.is_key {
634 primary_key_indices.push(columns.len());
636 }
637 columns.push(request.column_schema.clone());
638 });
639
640 let mut builder = SchemaBuilder::try_from(columns)
641 .with_context(|_| error::SchemaBuildSnafu {
642 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
643 })?
644 .version(table_schema.version() + 1);
646 for (k, v) in table_schema.metadata().iter() {
647 builder = builder.add_metadata(k, v);
648 }
649 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
650 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
651 })?;
652
653 let partition_key_indices = self
654 .partition_key_indices
655 .iter()
656 .map(|idx| table_schema.column_name_by_index(*idx))
657 .map(|name| new_schema.column_index_by_name(name).unwrap())
659 .collect();
660
661 let _ = meta_builder
663 .schema(Arc::new(new_schema))
664 .primary_key_indices(primary_key_indices)
665 .partition_key_indices(partition_key_indices);
666
667 Ok(meta_builder)
668 }
669
670 fn remove_columns(
671 &self,
672 table_name: &str,
673 column_names: &[String],
674 ) -> Result<TableMetaBuilder> {
675 let table_schema = &self.schema;
676 let column_names: HashSet<_> = column_names.iter().collect();
677 let mut meta_builder = self.new_meta_builder();
678
679 let timestamp_index = table_schema.timestamp_index();
680 for column_name in &column_names {
682 if let Some(index) = table_schema.column_index_by_name(column_name) {
683 ensure!(
686 !self.primary_key_indices.contains(&index),
687 error::RemoveColumnInIndexSnafu {
688 column_name: *column_name,
689 table_name,
690 }
691 );
692
693 ensure!(
694 !self.partition_key_indices.contains(&index),
695 error::RemovePartitionColumnSnafu {
696 column_name: *column_name,
697 table_name,
698 }
699 );
700
701 if let Some(ts_index) = timestamp_index {
702 ensure!(
704 index != ts_index,
705 error::RemoveColumnInIndexSnafu {
706 column_name: table_schema.column_name_by_index(ts_index),
707 table_name,
708 }
709 );
710 }
711 } else {
712 return error::ColumnNotExistsSnafu {
713 column_name: *column_name,
714 table_name,
715 }
716 .fail()?;
717 }
718 }
719
720 let columns: Vec<_> = table_schema
722 .column_schemas()
723 .iter()
724 .filter(|column_schema| !column_names.contains(&column_schema.name))
725 .cloned()
726 .collect();
727
728 let mut builder = SchemaBuilder::try_from_columns(columns)
729 .with_context(|_| error::SchemaBuildSnafu {
730 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
731 })?
732 .version(table_schema.version() + 1);
734 for (k, v) in table_schema.metadata().iter() {
735 builder = builder.add_metadata(k, v);
736 }
737 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
738 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
739 })?;
740
741 let primary_key_indices = self
743 .primary_key_indices
744 .iter()
745 .map(|idx| table_schema.column_name_by_index(*idx))
746 .map(|name| new_schema.column_index_by_name(name).unwrap())
748 .collect();
749
750 let partition_key_indices = self
751 .partition_key_indices
752 .iter()
753 .map(|idx| table_schema.column_name_by_index(*idx))
754 .map(|name| new_schema.column_index_by_name(name).unwrap())
756 .collect();
757
758 let _ = meta_builder
759 .schema(Arc::new(new_schema))
760 .primary_key_indices(primary_key_indices)
761 .partition_key_indices(partition_key_indices);
762
763 Ok(meta_builder)
764 }
765
766 fn modify_column_types(
767 &self,
768 table_name: &str,
769 requests: &[ModifyColumnTypeRequest],
770 ) -> Result<TableMetaBuilder> {
771 let table_schema = &self.schema;
772 let mut meta_builder = self.new_meta_builder();
773
774 let mut modify_column_types = HashMap::with_capacity(requests.len());
775 let timestamp_index = table_schema.timestamp_index();
776
777 for col_to_change in requests {
778 let change_column_name = &col_to_change.column_name;
779
780 let index = table_schema
781 .column_index_by_name(change_column_name)
782 .with_context(|| error::ColumnNotExistsSnafu {
783 column_name: change_column_name,
784 table_name,
785 })?;
786
787 let column = &table_schema.column_schemas()[index];
788
789 ensure!(
790 !self.primary_key_indices.contains(&index),
791 error::InvalidAlterRequestSnafu {
792 table: table_name,
793 err: format!(
794 "Not allowed to change primary key index column '{}'",
795 column.name
796 )
797 }
798 );
799
800 if let Some(ts_index) = timestamp_index {
801 ensure!(
803 index != ts_index,
804 error::InvalidAlterRequestSnafu {
805 table: table_name,
806 err: format!(
807 "Not allowed to change timestamp index column '{}' datatype",
808 column.name
809 )
810 }
811 );
812 }
813
814 ensure!(
815 modify_column_types
816 .insert(&col_to_change.column_name, col_to_change)
817 .is_none(),
818 error::InvalidAlterRequestSnafu {
819 table: table_name,
820 err: format!(
821 "change column datatype {} more than once",
822 col_to_change.column_name
823 ),
824 }
825 );
826
827 ensure!(
828 column
829 .data_type
830 .can_arrow_type_cast_to(&col_to_change.target_type),
831 error::InvalidAlterRequestSnafu {
832 table: table_name,
833 err: format!(
834 "column '{}' cannot be cast automatically to type '{}'",
835 col_to_change.column_name, col_to_change.target_type,
836 ),
837 }
838 );
839
840 ensure!(
841 column.is_nullable(),
842 error::InvalidAlterRequestSnafu {
843 table: table_name,
844 err: format!(
845 "column '{}' must be nullable to ensure safe conversion.",
846 col_to_change.column_name,
847 ),
848 }
849 );
850 }
851 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
854 for mut column in table_schema.column_schemas().iter().cloned() {
855 if let Some(change_column) = modify_column_types.get(&column.name) {
856 column.data_type = change_column.target_type.clone();
857 let new_default = if let Some(default_value) = column.default_constraint() {
858 Some(
859 default_value
860 .cast_to_datatype(&change_column.target_type)
861 .with_context(|_| error::CastDefaultValueSnafu {
862 reason: format!(
863 "Failed to cast default value from {:?} to type {:?}",
864 default_value, &change_column.target_type
865 ),
866 })?,
867 )
868 } else {
869 None
870 };
871 column = column
872 .clone()
873 .with_default_constraint(new_default.clone())
874 .with_context(|_| error::CastDefaultValueSnafu {
875 reason: format!("Failed to set new default: {:?}", new_default),
876 })?;
877 }
878 columns.push(column)
879 }
880
881 let mut builder = SchemaBuilder::try_from_columns(columns)
882 .with_context(|_| error::SchemaBuildSnafu {
883 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
884 })?
885 .version(table_schema.version() + 1);
887 for (k, v) in table_schema.metadata().iter() {
888 builder = builder.add_metadata(k, v);
889 }
890 let new_schema = builder.build().with_context(|_| {
891 let column_names: Vec<_> = requests
892 .iter()
893 .map(|request| &request.column_name)
894 .collect();
895
896 error::SchemaBuildSnafu {
897 msg: format!(
898 "Table {table_name} cannot change datatype with columns {column_names:?}"
899 ),
900 }
901 })?;
902
903 let _ = meta_builder
904 .schema(Arc::new(new_schema))
905 .primary_key_indices(self.primary_key_indices.clone());
906
907 Ok(meta_builder)
908 }
909
910 fn split_requests_by_column_location<'a>(
912 &self,
913 table_name: &str,
914 requests: &'a [AddColumnRequest],
915 ) -> Result<SplitResult<'a>> {
916 let table_schema = &self.schema;
917 let mut columns_at_first = Vec::new();
918 let mut columns_at_after = HashMap::new();
919 let mut columns_at_last = Vec::new();
920 let mut column_names = Vec::with_capacity(requests.len());
921 for request in requests {
922 let column_name = &request.column_schema.name;
924 column_names.push(column_name.clone());
925 ensure!(
926 table_schema.column_schema_by_name(column_name).is_none(),
927 error::ColumnExistsSnafu {
928 column_name,
929 table_name,
930 }
931 );
932 match request.location.as_ref() {
933 Some(AddColumnLocation::First) => {
934 columns_at_first.push(request);
935 }
936 Some(AddColumnLocation::After { column_name }) => {
937 ensure!(
938 table_schema.column_schema_by_name(column_name).is_some(),
939 error::ColumnNotExistsSnafu {
940 column_name,
941 table_name,
942 }
943 );
944 columns_at_after
945 .entry(column_name.clone())
946 .or_insert(Vec::new())
947 .push(request);
948 }
949 None => {
950 columns_at_last.push(request);
951 }
952 }
953 }
954 Ok(SplitResult {
955 columns_at_first,
956 columns_at_after,
957 columns_at_last,
958 column_names,
959 })
960 }
961
962 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
963 let table_schema = &self.schema;
964 let mut meta_builder = self.new_meta_builder();
965 let mut columns = Vec::with_capacity(table_schema.num_columns());
966 for column_schema in table_schema.column_schemas() {
967 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
968 ensure!(
970 column_schema.default_constraint().is_some(),
971 error::InvalidAlterRequestSnafu {
972 table: table_name,
973 err: format!("column {name} does not have a default value"),
974 }
975 );
976 if !column_schema.is_nullable() {
977 return error::InvalidAlterRequestSnafu {
978 table: table_name,
979 err: format!(
980 "column {name} is not nullable and `default` cannot be dropped",
981 ),
982 }
983 .fail();
984 }
985 let new_column_schema = column_schema.clone();
986 let new_column_schema = new_column_schema
987 .with_default_constraint(None)
988 .with_context(|_| error::SchemaBuildSnafu {
989 msg: format!("Table {table_name} cannot drop default values"),
990 })?;
991 columns.push(new_column_schema);
992 } else {
993 columns.push(column_schema.clone());
994 }
995 }
996
997 let mut builder = SchemaBuilder::try_from_columns(columns)
998 .with_context(|_| error::SchemaBuildSnafu {
999 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1000 })?
1001 .version(table_schema.version() + 1);
1003 for (k, v) in table_schema.metadata().iter() {
1004 builder = builder.add_metadata(k, v);
1005 }
1006 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1007 msg: format!("Table {table_name} cannot drop default values"),
1008 })?;
1009
1010 let _ = meta_builder.schema(Arc::new(new_schema));
1011
1012 Ok(meta_builder)
1013 }
1014}
1015#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1016#[builder(pattern = "owned")]
1017pub struct TableInfo {
1018 #[builder(default, setter(into))]
1020 pub ident: TableIdent,
1021 #[builder(setter(into))]
1023 pub name: String,
1024 #[builder(default, setter(into))]
1026 pub desc: Option<String>,
1027 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1028 pub catalog_name: String,
1029 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1030 pub schema_name: String,
1031 pub meta: TableMeta,
1032 #[builder(default = "TableType::Base")]
1033 pub table_type: TableType,
1034}
1035
1036pub type TableInfoRef = Arc<TableInfo>;
1037
1038impl TableInfo {
1039 pub fn table_id(&self) -> TableId {
1040 self.ident.table_id
1041 }
1042
1043 pub fn region_ids(&self) -> Vec<RegionId> {
1044 self.meta
1045 .region_numbers
1046 .iter()
1047 .map(|id| RegionId::new(self.table_id(), *id))
1048 .collect()
1049 }
1050 pub fn full_table_name(&self) -> String {
1052 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1053 }
1054
1055 pub fn get_db_string(&self) -> String {
1056 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1057 }
1058
1059 pub fn is_physical_table(&self) -> bool {
1061 self.meta
1062 .options
1063 .extra_options
1064 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1065 }
1066
1067 pub fn is_ttl_instant_table(&self) -> bool {
1069 self.meta
1070 .options
1071 .ttl
1072 .map(|t| t.is_instant())
1073 .unwrap_or(false)
1074 }
1075}
1076
1077impl TableInfoBuilder {
1078 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1079 Self {
1080 name: Some(name.into()),
1081 meta: Some(meta),
1082 ..Default::default()
1083 }
1084 }
1085
1086 pub fn table_id(mut self, id: TableId) -> Self {
1087 let ident = self.ident.get_or_insert_with(TableIdent::default);
1088 ident.table_id = id;
1089 self
1090 }
1091
1092 pub fn table_version(mut self, version: TableVersion) -> Self {
1093 let ident = self.ident.get_or_insert_with(TableIdent::default);
1094 ident.version = version;
1095 self
1096 }
1097}
1098
1099impl TableIdent {
1100 pub fn new(table_id: TableId) -> Self {
1101 Self {
1102 table_id,
1103 version: 0,
1104 }
1105 }
1106}
1107
1108impl From<TableId> for TableIdent {
1109 fn from(table_id: TableId) -> Self {
1110 Self::new(table_id)
1111 }
1112}
1113
1114#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1116pub struct RawTableMeta {
1117 pub schema: RawSchema,
1118 pub primary_key_indices: Vec<usize>,
1121 pub value_indices: Vec<usize>,
1124 pub engine: String,
1126 pub next_column_id: ColumnId,
1129 pub region_numbers: Vec<u32>,
1130 pub options: TableOptions,
1131 pub created_on: DateTime<Utc>,
1132 #[serde(default)]
1134 pub partition_key_indices: Vec<usize>,
1135 #[serde(default)]
1138 pub column_ids: Vec<ColumnId>,
1139}
1140
1141impl From<TableMeta> for RawTableMeta {
1142 fn from(meta: TableMeta) -> RawTableMeta {
1143 RawTableMeta {
1144 schema: RawSchema::from(&*meta.schema),
1145 primary_key_indices: meta.primary_key_indices,
1146 value_indices: meta.value_indices,
1147 engine: meta.engine,
1148 next_column_id: meta.next_column_id,
1149 region_numbers: meta.region_numbers,
1150 options: meta.options,
1151 created_on: meta.created_on,
1152 partition_key_indices: meta.partition_key_indices,
1153 column_ids: meta.column_ids,
1154 }
1155 }
1156}
1157
1158impl TryFrom<RawTableMeta> for TableMeta {
1159 type Error = ConvertError;
1160
1161 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1162 Ok(TableMeta {
1163 schema: Arc::new(Schema::try_from(raw.schema)?),
1164 primary_key_indices: raw.primary_key_indices,
1165 value_indices: raw.value_indices,
1166 engine: raw.engine,
1167 region_numbers: raw.region_numbers,
1168 next_column_id: raw.next_column_id,
1169 options: raw.options,
1170 created_on: raw.created_on,
1171 partition_key_indices: raw.partition_key_indices,
1172 column_ids: raw.column_ids,
1173 })
1174 }
1175}
1176
1177#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1179pub struct RawTableInfo {
1180 pub ident: TableIdent,
1181 pub name: String,
1182 pub desc: Option<String>,
1183 pub catalog_name: String,
1184 pub schema_name: String,
1185 pub meta: RawTableMeta,
1186 pub table_type: TableType,
1187}
1188
1189impl RawTableInfo {
1190 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1194 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1195 None
1196 } else {
1197 Some(
1198 self.meta
1199 .column_ids
1200 .iter()
1201 .enumerate()
1202 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1203 .collect(),
1204 )
1205 }
1206 }
1207
1208 pub fn sort_columns(&mut self) {
1210 let column_schemas = &self.meta.schema.column_schemas;
1211 let primary_keys = self
1212 .meta
1213 .primary_key_indices
1214 .iter()
1215 .map(|index| column_schemas[*index].name.clone())
1216 .collect::<HashSet<_>>();
1217
1218 let name_to_ids = self.name_to_ids().unwrap_or_default();
1219 self.meta
1220 .schema
1221 .column_schemas
1222 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1223
1224 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1226 let mut timestamp_index = None;
1227 let mut value_indices =
1228 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1229 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1230 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1231 if primary_keys.contains(&column_schema.name) {
1232 primary_key_indices.push(index);
1233 } else if column_schema.is_time_index() {
1234 value_indices.push(index);
1235 timestamp_index = Some(index);
1236 } else {
1237 value_indices.push(index);
1238 }
1239 if let Some(id) = name_to_ids.get(&column_schema.name) {
1240 column_ids.push(*id);
1241 }
1242 }
1243
1244 self.meta.schema.timestamp_index = timestamp_index;
1246 self.meta.primary_key_indices = primary_key_indices;
1247 self.meta.value_indices = value_indices;
1248 self.meta.column_ids = column_ids;
1249 }
1250
1251 pub fn to_region_options(&self) -> HashMap<String, String> {
1255 HashMap::from(&self.meta.options)
1256 }
1257
1258 pub fn table_ref(&self) -> TableReference {
1260 TableReference::full(
1261 self.catalog_name.as_str(),
1262 self.schema_name.as_str(),
1263 self.name.as_str(),
1264 )
1265 }
1266}
1267
1268impl From<TableInfo> for RawTableInfo {
1269 fn from(info: TableInfo) -> RawTableInfo {
1270 RawTableInfo {
1271 ident: info.ident,
1272 name: info.name,
1273 desc: info.desc,
1274 catalog_name: info.catalog_name,
1275 schema_name: info.schema_name,
1276 meta: RawTableMeta::from(info.meta),
1277 table_type: info.table_type,
1278 }
1279 }
1280}
1281
1282impl TryFrom<RawTableInfo> for TableInfo {
1283 type Error = ConvertError;
1284
1285 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1286 Ok(TableInfo {
1287 ident: raw.ident,
1288 name: raw.name,
1289 desc: raw.desc,
1290 catalog_name: raw.catalog_name,
1291 schema_name: raw.schema_name,
1292 meta: TableMeta::try_from(raw.meta)?,
1293 table_type: raw.table_type,
1294 })
1295 }
1296}
1297
1298fn set_column_fulltext_options(
1307 column_schema: &mut ColumnSchema,
1308 column_name: &str,
1309 options: &FulltextOptions,
1310 current_options: Option<FulltextOptions>,
1311) -> Result<()> {
1312 if let Some(current_options) = current_options {
1313 ensure!(
1314 current_options.analyzer == options.analyzer
1315 && current_options.case_sensitive == options.case_sensitive,
1316 error::InvalidColumnOptionSnafu {
1317 column_name,
1318 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1319 current_options.analyzer, current_options.case_sensitive),
1320 }
1321 );
1322 }
1323
1324 column_schema
1325 .set_fulltext_options(options)
1326 .context(error::SetFulltextOptionsSnafu { column_name })?;
1327
1328 Ok(())
1329}
1330
1331fn unset_column_fulltext_options(
1332 column_schema: &mut ColumnSchema,
1333 column_name: &str,
1334 current_options: Option<FulltextOptions>,
1335) -> Result<()> {
1336 ensure!(
1337 current_options
1338 .as_ref()
1339 .is_some_and(|options| options.enable),
1340 error::InvalidColumnOptionSnafu {
1341 column_name,
1342 msg: "FULLTEXT index already disabled".to_string(),
1343 }
1344 );
1345
1346 let mut options = current_options.unwrap();
1347 options.enable = false;
1348 column_schema
1349 .set_fulltext_options(&options)
1350 .context(error::SetFulltextOptionsSnafu { column_name })?;
1351
1352 Ok(())
1353}
1354
1355fn set_column_skipping_index_options(
1356 column_schema: &mut ColumnSchema,
1357 column_name: &str,
1358 options: &SkippingIndexOptions,
1359) -> Result<()> {
1360 column_schema
1361 .set_skipping_options(options)
1362 .context(error::SetSkippingOptionsSnafu { column_name })?;
1363
1364 Ok(())
1365}
1366
1367fn unset_column_skipping_index_options(
1368 column_schema: &mut ColumnSchema,
1369 column_name: &str,
1370) -> Result<()> {
1371 column_schema
1372 .unset_skipping_options()
1373 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1374 Ok(())
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379 use std::assert_matches::assert_matches;
1380
1381 use common_error::ext::ErrorExt;
1382 use common_error::status_code::StatusCode;
1383 use datatypes::data_type::ConcreteDataType;
1384 use datatypes::schema::{
1385 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1386 };
1387
1388 use super::*;
1389 use crate::Error;
1390
1391 fn new_test_schema() -> Schema {
1393 let column_schemas = vec![
1394 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1395 ColumnSchema::new(
1396 "ts",
1397 ConcreteDataType::timestamp_millisecond_datatype(),
1398 false,
1399 )
1400 .with_time_index(true),
1401 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1402 ];
1403 SchemaBuilder::try_from(column_schemas)
1404 .unwrap()
1405 .version(123)
1406 .build()
1407 .unwrap()
1408 }
1409
1410 #[test]
1411 fn test_raw_convert() {
1412 let schema = Arc::new(new_test_schema());
1413 let meta = TableMetaBuilder::empty()
1414 .schema(schema)
1415 .primary_key_indices(vec![0])
1416 .engine("engine")
1417 .next_column_id(3)
1418 .build()
1419 .unwrap();
1420 let info = TableInfoBuilder::default()
1421 .table_id(10)
1422 .table_version(5)
1423 .name("mytable")
1424 .meta(meta)
1425 .build()
1426 .unwrap();
1427
1428 let raw = RawTableInfo::from(info.clone());
1429 let info_new = TableInfo::try_from(raw).unwrap();
1430
1431 assert_eq!(info, info_new);
1432 }
1433
1434 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1435 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1436 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1437 let alter_kind = AlterKind::AddColumns {
1438 columns: vec![
1439 AddColumnRequest {
1440 column_schema: new_tag,
1441 is_key: true,
1442 location: None,
1443 add_if_not_exists: false,
1444 },
1445 AddColumnRequest {
1446 column_schema: new_field,
1447 is_key: false,
1448 location: None,
1449 add_if_not_exists: false,
1450 },
1451 ],
1452 };
1453
1454 let builder = meta
1455 .builder_with_alter_kind("my_table", &alter_kind)
1456 .unwrap();
1457 builder.build().unwrap()
1458 }
1459
1460 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1461 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1462 let new_field = ColumnSchema::new(
1463 "my_field_after_ts",
1464 ConcreteDataType::string_datatype(),
1465 true,
1466 );
1467 let yet_another_field = ColumnSchema::new(
1468 "yet_another_field_after_ts",
1469 ConcreteDataType::int64_datatype(),
1470 true,
1471 );
1472 let alter_kind = AlterKind::AddColumns {
1473 columns: vec![
1474 AddColumnRequest {
1475 column_schema: new_tag,
1476 is_key: true,
1477 location: Some(AddColumnLocation::First),
1478 add_if_not_exists: false,
1479 },
1480 AddColumnRequest {
1481 column_schema: new_field,
1482 is_key: false,
1483 location: Some(AddColumnLocation::After {
1484 column_name: "ts".to_string(),
1485 }),
1486 add_if_not_exists: false,
1487 },
1488 AddColumnRequest {
1489 column_schema: yet_another_field,
1490 is_key: true,
1491 location: Some(AddColumnLocation::After {
1492 column_name: "ts".to_string(),
1493 }),
1494 add_if_not_exists: false,
1495 },
1496 ],
1497 };
1498
1499 let builder = meta
1500 .builder_with_alter_kind("my_table", &alter_kind)
1501 .unwrap();
1502 builder.build().unwrap()
1503 }
1504
1505 #[test]
1506 fn test_add_columns() {
1507 let schema = Arc::new(new_test_schema());
1508 let meta = TableMetaBuilder::empty()
1509 .schema(schema)
1510 .primary_key_indices(vec![0])
1511 .engine("engine")
1512 .next_column_id(3)
1513 .build()
1514 .unwrap();
1515
1516 let new_meta = add_columns_to_meta(&meta);
1517 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1518
1519 let names: Vec<String> = new_meta
1520 .schema
1521 .column_schemas()
1522 .iter()
1523 .map(|column_schema| column_schema.name.clone())
1524 .collect();
1525 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1526 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1527 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1528 }
1529
1530 #[test]
1531 fn test_add_columns_multiple_times() {
1532 let schema = Arc::new(new_test_schema());
1533 let meta = TableMetaBuilder::empty()
1534 .schema(schema)
1535 .primary_key_indices(vec![0])
1536 .engine("engine")
1537 .next_column_id(3)
1538 .build()
1539 .unwrap();
1540
1541 let alter_kind = AlterKind::AddColumns {
1542 columns: vec![
1543 AddColumnRequest {
1544 column_schema: ColumnSchema::new(
1545 "col3",
1546 ConcreteDataType::int32_datatype(),
1547 true,
1548 ),
1549 is_key: true,
1550 location: None,
1551 add_if_not_exists: true,
1552 },
1553 AddColumnRequest {
1554 column_schema: ColumnSchema::new(
1555 "col3",
1556 ConcreteDataType::int32_datatype(),
1557 true,
1558 ),
1559 is_key: true,
1560 location: None,
1561 add_if_not_exists: true,
1562 },
1563 ],
1564 };
1565 let err = meta
1566 .builder_with_alter_kind("my_table", &alter_kind)
1567 .err()
1568 .unwrap();
1569 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1570 }
1571
1572 #[test]
1573 fn test_remove_columns() {
1574 let schema = Arc::new(new_test_schema());
1575 let meta = TableMetaBuilder::empty()
1576 .schema(schema.clone())
1577 .primary_key_indices(vec![0])
1578 .engine("engine")
1579 .next_column_id(3)
1580 .build()
1581 .unwrap();
1582 let meta = add_columns_to_meta(&meta);
1584
1585 let alter_kind = AlterKind::DropColumns {
1586 names: vec![String::from("col2"), String::from("my_field")],
1587 };
1588 let new_meta = meta
1589 .builder_with_alter_kind("my_table", &alter_kind)
1590 .unwrap()
1591 .build()
1592 .unwrap();
1593
1594 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1595
1596 let names: Vec<String> = new_meta
1597 .schema
1598 .column_schemas()
1599 .iter()
1600 .map(|column_schema| column_schema.name.clone())
1601 .collect();
1602 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1603 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1604 assert_eq!(&[1], &new_meta.value_indices[..]);
1605 assert_eq!(
1606 schema.timestamp_column(),
1607 new_meta.schema.timestamp_column()
1608 );
1609 }
1610
1611 #[test]
1612 fn test_remove_multiple_columns_before_timestamp() {
1613 let column_schemas = vec![
1614 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1615 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1616 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1617 ColumnSchema::new(
1618 "ts",
1619 ConcreteDataType::timestamp_millisecond_datatype(),
1620 false,
1621 )
1622 .with_time_index(true),
1623 ];
1624 let schema = Arc::new(
1625 SchemaBuilder::try_from(column_schemas)
1626 .unwrap()
1627 .version(123)
1628 .build()
1629 .unwrap(),
1630 );
1631 let meta = TableMetaBuilder::empty()
1632 .schema(schema.clone())
1633 .primary_key_indices(vec![1])
1634 .engine("engine")
1635 .next_column_id(4)
1636 .build()
1637 .unwrap();
1638
1639 let alter_kind = AlterKind::DropColumns {
1641 names: vec![String::from("col3"), String::from("col1")],
1642 };
1643 let new_meta = meta
1644 .builder_with_alter_kind("my_table", &alter_kind)
1645 .unwrap()
1646 .build()
1647 .unwrap();
1648
1649 let names: Vec<String> = new_meta
1650 .schema
1651 .column_schemas()
1652 .iter()
1653 .map(|column_schema| column_schema.name.clone())
1654 .collect();
1655 assert_eq!(&["col2", "ts"], &names[..]);
1656 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1657 assert_eq!(&[1], &new_meta.value_indices[..]);
1658 assert_eq!(
1659 schema.timestamp_column(),
1660 new_meta.schema.timestamp_column()
1661 );
1662 }
1663
1664 #[test]
1665 fn test_add_existing_column() {
1666 let schema = Arc::new(new_test_schema());
1667 let meta = TableMetaBuilder::empty()
1668 .schema(schema)
1669 .primary_key_indices(vec![0])
1670 .engine("engine")
1671 .next_column_id(3)
1672 .build()
1673 .unwrap();
1674
1675 let alter_kind = AlterKind::AddColumns {
1676 columns: vec![AddColumnRequest {
1677 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1678 is_key: false,
1679 location: None,
1680 add_if_not_exists: false,
1681 }],
1682 };
1683
1684 let err = meta
1685 .builder_with_alter_kind("my_table", &alter_kind)
1686 .err()
1687 .unwrap();
1688 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1689
1690 let alter_kind = AlterKind::AddColumns {
1692 columns: vec![AddColumnRequest {
1693 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1694 is_key: true,
1695 location: None,
1696 add_if_not_exists: true,
1697 }],
1698 };
1699 let new_meta = meta
1700 .builder_with_alter_kind("my_table", &alter_kind)
1701 .unwrap()
1702 .build()
1703 .unwrap();
1704 assert_eq!(
1705 meta.schema.column_schemas(),
1706 new_meta.schema.column_schemas()
1707 );
1708 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1709 }
1710
1711 #[test]
1712 fn test_add_different_type_column() {
1713 let schema = Arc::new(new_test_schema());
1714 let meta = TableMetaBuilder::empty()
1715 .schema(schema)
1716 .primary_key_indices(vec![0])
1717 .engine("engine")
1718 .next_column_id(3)
1719 .build()
1720 .unwrap();
1721
1722 let alter_kind = AlterKind::AddColumns {
1724 columns: vec![AddColumnRequest {
1725 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1726 is_key: false,
1727 location: None,
1728 add_if_not_exists: true,
1729 }],
1730 };
1731 let err = meta
1732 .builder_with_alter_kind("my_table", &alter_kind)
1733 .err()
1734 .unwrap();
1735 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1736 }
1737
1738 #[test]
1739 fn test_add_invalid_column() {
1740 let schema = Arc::new(new_test_schema());
1741 let meta = TableMetaBuilder::empty()
1742 .schema(schema)
1743 .primary_key_indices(vec![0])
1744 .engine("engine")
1745 .next_column_id(3)
1746 .build()
1747 .unwrap();
1748
1749 let alter_kind = AlterKind::AddColumns {
1751 columns: vec![AddColumnRequest {
1752 column_schema: ColumnSchema::new(
1753 "weny",
1754 ConcreteDataType::string_datatype(),
1755 false,
1756 ),
1757 is_key: false,
1758 location: None,
1759 add_if_not_exists: false,
1760 }],
1761 };
1762
1763 let err = meta
1764 .builder_with_alter_kind("my_table", &alter_kind)
1765 .err()
1766 .unwrap();
1767 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1768 }
1769
1770 #[test]
1771 fn test_remove_unknown_column() {
1772 let schema = Arc::new(new_test_schema());
1773 let meta = TableMetaBuilder::empty()
1774 .schema(schema)
1775 .primary_key_indices(vec![0])
1776 .engine("engine")
1777 .next_column_id(3)
1778 .build()
1779 .unwrap();
1780
1781 let alter_kind = AlterKind::DropColumns {
1782 names: vec![String::from("unknown")],
1783 };
1784
1785 let err = meta
1786 .builder_with_alter_kind("my_table", &alter_kind)
1787 .err()
1788 .unwrap();
1789 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1790 }
1791
1792 #[test]
1793 fn test_change_unknown_column_data_type() {
1794 let schema = Arc::new(new_test_schema());
1795 let meta = TableMetaBuilder::empty()
1796 .schema(schema)
1797 .primary_key_indices(vec![0])
1798 .engine("engine")
1799 .next_column_id(3)
1800 .build()
1801 .unwrap();
1802
1803 let alter_kind = AlterKind::ModifyColumnTypes {
1804 columns: vec![ModifyColumnTypeRequest {
1805 column_name: "unknown".to_string(),
1806 target_type: ConcreteDataType::string_datatype(),
1807 }],
1808 };
1809
1810 let err = meta
1811 .builder_with_alter_kind("my_table", &alter_kind)
1812 .err()
1813 .unwrap();
1814 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1815 }
1816
1817 #[test]
1818 fn test_remove_key_column() {
1819 let schema = Arc::new(new_test_schema());
1820 let meta = TableMetaBuilder::empty()
1821 .schema(schema)
1822 .primary_key_indices(vec![0])
1823 .engine("engine")
1824 .next_column_id(3)
1825 .build()
1826 .unwrap();
1827
1828 let alter_kind = AlterKind::DropColumns {
1830 names: vec![String::from("col1")],
1831 };
1832
1833 let err = meta
1834 .builder_with_alter_kind("my_table", &alter_kind)
1835 .err()
1836 .unwrap();
1837 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1838
1839 let alter_kind = AlterKind::DropColumns {
1841 names: vec![String::from("ts")],
1842 };
1843
1844 let err = meta
1845 .builder_with_alter_kind("my_table", &alter_kind)
1846 .err()
1847 .unwrap();
1848 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1849 }
1850
1851 #[test]
1852 fn test_remove_partition_column() {
1853 let schema = Arc::new(new_test_schema());
1854 let meta = TableMetaBuilder::empty()
1855 .schema(schema)
1856 .primary_key_indices(vec![])
1857 .partition_key_indices(vec![0])
1858 .engine("engine")
1859 .next_column_id(3)
1860 .build()
1861 .unwrap();
1862 let alter_kind = AlterKind::DropColumns {
1864 names: vec![String::from("col1")],
1865 };
1866
1867 let err = meta
1868 .builder_with_alter_kind("my_table", &alter_kind)
1869 .err()
1870 .unwrap();
1871 assert_matches!(err, Error::RemovePartitionColumn { .. });
1872 }
1873
1874 #[test]
1875 fn test_change_key_column_data_type() {
1876 let schema = Arc::new(new_test_schema());
1877 let meta = TableMetaBuilder::empty()
1878 .schema(schema)
1879 .primary_key_indices(vec![0])
1880 .engine("engine")
1881 .next_column_id(3)
1882 .build()
1883 .unwrap();
1884
1885 let alter_kind = AlterKind::ModifyColumnTypes {
1887 columns: vec![ModifyColumnTypeRequest {
1888 column_name: "col1".to_string(),
1889 target_type: ConcreteDataType::string_datatype(),
1890 }],
1891 };
1892
1893 let err = meta
1894 .builder_with_alter_kind("my_table", &alter_kind)
1895 .err()
1896 .unwrap();
1897 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1898
1899 let alter_kind = AlterKind::ModifyColumnTypes {
1901 columns: vec![ModifyColumnTypeRequest {
1902 column_name: "ts".to_string(),
1903 target_type: ConcreteDataType::string_datatype(),
1904 }],
1905 };
1906
1907 let err = meta
1908 .builder_with_alter_kind("my_table", &alter_kind)
1909 .err()
1910 .unwrap();
1911 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1912 }
1913
1914 #[test]
1915 fn test_alloc_new_column() {
1916 let schema = Arc::new(new_test_schema());
1917 let mut meta = TableMetaBuilder::empty()
1918 .schema(schema)
1919 .primary_key_indices(vec![0])
1920 .engine("engine")
1921 .next_column_id(3)
1922 .build()
1923 .unwrap();
1924 assert_eq!(3, meta.next_column_id);
1925
1926 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1927 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1928
1929 assert_eq!(4, meta.next_column_id);
1930 assert_eq!(column_schema.name, desc.name);
1931 }
1932
1933 #[test]
1934 fn test_add_columns_with_location() {
1935 let schema = Arc::new(new_test_schema());
1936 let meta = TableMetaBuilder::empty()
1937 .schema(schema)
1938 .primary_key_indices(vec![0])
1939 .partition_key_indices(vec![0, 2])
1941 .engine("engine")
1942 .next_column_id(3)
1943 .build()
1944 .unwrap();
1945
1946 let new_meta = add_columns_to_meta_with_location(&meta);
1947 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1948
1949 let names: Vec<String> = new_meta
1950 .schema
1951 .column_schemas()
1952 .iter()
1953 .map(|column_schema| column_schema.name.clone())
1954 .collect();
1955 assert_eq!(
1956 &[
1957 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
1964 &names[..]
1965 );
1966 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
1967 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
1968 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
1969 }
1970
1971 #[test]
1972 fn test_modify_column_fulltext_options() {
1973 let schema = Arc::new(new_test_schema());
1974 let meta = TableMetaBuilder::empty()
1975 .schema(schema)
1976 .primary_key_indices(vec![0])
1977 .engine("engine")
1978 .next_column_id(3)
1979 .build()
1980 .unwrap();
1981
1982 let alter_kind = AlterKind::SetIndex {
1983 options: SetIndexOptions::Fulltext {
1984 column_name: "col1".to_string(),
1985 options: FulltextOptions::default(),
1986 },
1987 };
1988 let err = meta
1989 .builder_with_alter_kind("my_table", &alter_kind)
1990 .err()
1991 .unwrap();
1992 assert_eq!(
1993 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1994 err.to_string()
1995 );
1996
1997 let new_meta = add_columns_to_meta_with_location(&meta);
1999 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2000
2001 let alter_kind = AlterKind::SetIndex {
2002 options: SetIndexOptions::Fulltext {
2003 column_name: "my_tag_first".to_string(),
2004 options: FulltextOptions::new_unchecked(
2005 true,
2006 FulltextAnalyzer::Chinese,
2007 true,
2008 FulltextBackend::Bloom,
2009 1000,
2010 0.01,
2011 ),
2012 },
2013 };
2014 let new_meta = new_meta
2015 .builder_with_alter_kind("my_table", &alter_kind)
2016 .unwrap()
2017 .build()
2018 .unwrap();
2019 let column_schema = new_meta
2020 .schema
2021 .column_schema_by_name("my_tag_first")
2022 .unwrap();
2023 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2024 assert!(fulltext_options.enable);
2025 assert_eq!(
2026 datatypes::schema::FulltextAnalyzer::Chinese,
2027 fulltext_options.analyzer
2028 );
2029 assert!(fulltext_options.case_sensitive);
2030
2031 let alter_kind = AlterKind::UnsetIndex {
2032 options: UnsetIndexOptions::Fulltext {
2033 column_name: "my_tag_first".to_string(),
2034 },
2035 };
2036 let new_meta = new_meta
2037 .builder_with_alter_kind("my_table", &alter_kind)
2038 .unwrap()
2039 .build()
2040 .unwrap();
2041 let column_schema = new_meta
2042 .schema
2043 .column_schema_by_name("my_tag_first")
2044 .unwrap();
2045 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2046 assert!(!fulltext_options.enable);
2047 }
2048}