1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105 fn from(t: TableType) -> datafusion::datasource::TableType {
106 match t {
107 TableType::Base => datafusion::datasource::TableType::Base,
108 TableType::View => datafusion::datasource::TableType::View,
109 TableType::Temporary => datafusion::datasource::TableType::Temporary,
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117 pub table_id: TableId,
119 pub version: TableVersion,
122}
123
124#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130 pub schema: SchemaRef,
131 pub primary_key_indices: Vec<usize>,
134 #[builder(default = "self.default_value_indices()?")]
135 pub value_indices: Vec<usize>,
136 #[builder(default, setter(into))]
137 pub engine: String,
138 pub next_column_id: ColumnId,
139 #[builder(default)]
141 pub options: TableOptions,
142 #[builder(default = "Utc::now()")]
143 pub created_on: DateTime<Utc>,
144 #[builder(default = "self.default_updated_on()")]
145 pub updated_on: DateTime<Utc>,
146 #[builder(default = "Vec::new()")]
147 pub partition_key_indices: Vec<usize>,
148 #[builder(default = "Vec::new()")]
149 pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153 #[cfg(any(test, feature = "testing"))]
155 pub fn empty() -> Self {
156 Self {
157 schema: None,
158 primary_key_indices: None,
159 value_indices: None,
160 engine: None,
161 next_column_id: None,
162 options: None,
163 created_on: None,
164 updated_on: None,
165 partition_key_indices: None,
166 column_ids: None,
167 }
168 }
169}
170
171impl TableMetaBuilder {
172 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173 match (&self.primary_key_indices, &self.schema) {
174 (Some(v), Some(schema)) => {
175 let column_schemas = schema.column_schemas();
176 Ok((0..column_schemas.len())
177 .filter(|idx| !v.contains(idx))
178 .collect())
179 }
180 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181 }
182 }
183
184 fn default_updated_on(&self) -> DateTime<Utc> {
185 self.created_on.unwrap_or_default()
186 }
187
188 pub fn new_external_table() -> Self {
189 Self {
190 schema: None,
191 primary_key_indices: Some(Vec::new()),
192 value_indices: Some(Vec::new()),
193 engine: None,
194 next_column_id: Some(0),
195 options: None,
196 created_on: None,
197 updated_on: None,
198 partition_key_indices: None,
199 column_ids: None,
200 }
201 }
202}
203
204struct SplitResult<'a> {
206 columns_at_first: Vec<&'a AddColumnRequest>,
208 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
210 columns_at_last: Vec<&'a AddColumnRequest>,
212 column_names: Vec<String>,
214}
215
216impl TableMeta {
217 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
218 let columns_schemas = &self.schema.column_schemas();
219 self.primary_key_indices
220 .iter()
221 .map(|idx| &columns_schemas[*idx].name)
222 }
223
224 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
225 let columns_schemas = self.schema.column_schemas();
227 let primary_key_indices = &self.primary_key_indices;
228 columns_schemas
229 .iter()
230 .enumerate()
231 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
232 .map(|(_, cs)| &cs.name)
233 }
234
235 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
236 let columns_schemas = &self.schema.column_schemas();
237 self.partition_key_indices
238 .iter()
239 .map(|idx| &columns_schemas[*idx].name)
240 }
241
242 pub fn builder_with_alter_kind(
246 &self,
247 table_name: &str,
248 alter_kind: &AlterKind,
249 ) -> Result<TableMetaBuilder> {
250 let mut builder = match alter_kind {
251 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
252 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
253 AlterKind::ModifyColumnTypes { columns } => {
254 self.modify_column_types(table_name, columns)
255 }
256 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
258 AlterKind::SetTableOptions { options } => self.set_table_options(options),
259 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
260 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
261 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
262 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
263 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
264 }?;
265 let _ = builder.updated_on(Utc::now());
266 Ok(builder)
267 }
268
269 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
271 let mut new_options = self.options.clone();
272
273 for request in requests {
274 match request {
275 SetRegionOption::Ttl(new_ttl) => {
276 new_options.ttl = *new_ttl;
277 }
278 SetRegionOption::Twsc(key, value) => {
279 if !value.is_empty() {
280 new_options.extra_options.insert(key.clone(), value.clone());
281 new_options.extra_options.insert(
283 COMPACTION_TYPE.to_string(),
284 COMPACTION_TYPE_TWCS.to_string(),
285 );
286 } else {
287 new_options.extra_options.remove(key.as_str());
289 }
290 }
291 SetRegionOption::Format(value) => {
292 new_options
293 .extra_options
294 .insert(SST_FORMAT_KEY.to_string(), value.clone());
295 }
296 }
297 }
298 let mut builder = self.new_meta_builder();
299 builder.options(new_options);
300
301 Ok(builder)
302 }
303
304 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
305 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
306 self.set_table_options(&requests)
307 }
308
309 fn set_indexes(
310 &self,
311 table_name: &str,
312 requests: &[SetIndexOption],
313 ) -> Result<TableMetaBuilder> {
314 let table_schema = &self.schema;
315 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
316 for request in requests {
317 let column_name = request.column_name();
318 table_schema
319 .column_index_by_name(column_name)
320 .with_context(|| error::ColumnNotExistsSnafu {
321 column_name,
322 table_name,
323 })?;
324 set_index_options
325 .entry(column_name)
326 .or_default()
327 .push(request);
328 }
329
330 let mut meta_builder = self.new_meta_builder();
331 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
332 for mut column in table_schema.column_schemas().iter().cloned() {
333 if let Some(request) = set_index_options.get(column.name.as_str()) {
334 for request in request {
335 self.set_index(&mut column, request)?;
336 }
337 }
338 columns.push(column);
339 }
340
341 let mut builder = SchemaBuilder::try_from_columns(columns)
342 .with_context(|_| error::SchemaBuildSnafu {
343 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
344 })?
345 .version(table_schema.version() + 1);
346
347 for (k, v) in table_schema.metadata().iter() {
348 builder = builder.add_metadata(k, v);
349 }
350
351 let new_schema = builder.build().with_context(|_| {
352 let column_names = requests
353 .iter()
354 .map(|request| request.column_name())
355 .collect::<Vec<_>>();
356 error::SchemaBuildSnafu {
357 msg: format!(
358 "Table {table_name} cannot set index options with columns {column_names:?}",
359 ),
360 }
361 })?;
362 let _ = meta_builder
363 .schema(Arc::new(new_schema))
364 .primary_key_indices(self.primary_key_indices.clone());
365
366 Ok(meta_builder)
367 }
368
369 fn unset_indexes(
370 &self,
371 table_name: &str,
372 requests: &[UnsetIndexOption],
373 ) -> Result<TableMetaBuilder> {
374 let table_schema = &self.schema;
375 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
376 for request in requests {
377 let column_name = request.column_name();
378 table_schema
379 .column_index_by_name(column_name)
380 .with_context(|| error::ColumnNotExistsSnafu {
381 column_name,
382 table_name,
383 })?;
384 set_index_options
385 .entry(column_name)
386 .or_default()
387 .push(request);
388 }
389
390 let mut meta_builder = self.new_meta_builder();
391 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
392 for mut column in table_schema.column_schemas().iter().cloned() {
393 if let Some(request) = set_index_options.get(column.name.as_str()) {
394 for request in request {
395 self.unset_index(&mut column, request)?;
396 }
397 }
398 columns.push(column);
399 }
400
401 let mut builder = SchemaBuilder::try_from_columns(columns)
402 .with_context(|_| error::SchemaBuildSnafu {
403 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
404 })?
405 .version(table_schema.version() + 1);
406
407 for (k, v) in table_schema.metadata().iter() {
408 builder = builder.add_metadata(k, v);
409 }
410
411 let new_schema = builder.build().with_context(|_| {
412 let column_names = requests
413 .iter()
414 .map(|request| request.column_name())
415 .collect::<Vec<_>>();
416 error::SchemaBuildSnafu {
417 msg: format!(
418 "Table {table_name} cannot set index options with columns {column_names:?}",
419 ),
420 }
421 })?;
422 let _ = meta_builder
423 .schema(Arc::new(new_schema))
424 .primary_key_indices(self.primary_key_indices.clone());
425
426 Ok(meta_builder)
427 }
428
429 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
430 match request {
431 SetIndexOption::Fulltext {
432 column_name,
433 options,
434 } => {
435 ensure!(
436 column_schema.data_type.is_string(),
437 error::InvalidColumnOptionSnafu {
438 column_name,
439 msg: "FULLTEXT index only supports string type",
440 }
441 );
442
443 let current_fulltext_options = column_schema
444 .fulltext_options()
445 .context(error::SetFulltextOptionsSnafu { column_name })?;
446 set_column_fulltext_options(
447 column_schema,
448 column_name,
449 options,
450 current_fulltext_options,
451 )?;
452 }
453 SetIndexOption::Inverted { column_name } => {
454 debug_assert_eq!(column_schema.name, *column_name);
455 column_schema.set_inverted_index(true);
456 }
457 SetIndexOption::Skipping {
458 column_name,
459 options,
460 } => {
461 set_column_skipping_index_options(column_schema, column_name, options)?;
462 }
463 }
464
465 Ok(())
466 }
467
468 fn unset_index(
469 &self,
470 column_schema: &mut ColumnSchema,
471 request: &UnsetIndexOption,
472 ) -> Result<()> {
473 match request {
474 UnsetIndexOption::Fulltext { column_name } => {
475 let current_fulltext_options = column_schema
476 .fulltext_options()
477 .context(error::SetFulltextOptionsSnafu { column_name })?;
478 unset_column_fulltext_options(
479 column_schema,
480 column_name,
481 current_fulltext_options.clone(),
482 )?
483 }
484 UnsetIndexOption::Inverted { .. } => {
485 column_schema.set_inverted_index(false);
486 }
487 UnsetIndexOption::Skipping { column_name } => {
488 unset_column_skipping_index_options(column_schema, column_name)?;
489 }
490 }
491
492 Ok(())
493 }
494
495 pub fn alloc_new_column(
500 &mut self,
501 table_name: &str,
502 new_column: &ColumnSchema,
503 ) -> Result<ColumnDescriptor> {
504 let desc = ColumnDescriptorBuilder::new(
505 self.next_column_id as ColumnId,
506 &new_column.name,
507 new_column.data_type.clone(),
508 )
509 .is_nullable(new_column.is_nullable())
510 .default_constraint(new_column.default_constraint().cloned())
511 .build()
512 .context(error::BuildColumnDescriptorSnafu {
513 table_name,
514 column_name: &new_column.name,
515 })?;
516
517 self.next_column_id += 1;
519
520 Ok(desc)
521 }
522
523 fn new_meta_builder(&self) -> TableMetaBuilder {
525 let mut builder = TableMetaBuilder::from(self);
526 builder.value_indices = None;
528 builder
529 }
530
531 fn add_columns(
533 &self,
534 table_name: &str,
535 requests: &[AddColumnRequest],
536 ) -> Result<TableMetaBuilder> {
537 let table_schema = &self.schema;
538 let mut meta_builder = self.new_meta_builder();
539 let original_primary_key_indices: HashSet<&usize> =
540 self.primary_key_indices.iter().collect();
541
542 let mut names = HashSet::with_capacity(requests.len());
543 let mut new_columns = Vec::with_capacity(requests.len());
544 for col_to_add in requests {
545 if let Some(column_schema) =
546 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
547 {
548 ensure!(
550 col_to_add.add_if_not_exists,
551 error::ColumnExistsSnafu {
552 table_name,
553 column_name: &col_to_add.column_schema.name
554 },
555 );
556
557 ensure!(
559 column_schema.data_type == col_to_add.column_schema.data_type,
560 error::InvalidAlterRequestSnafu {
561 table: table_name,
562 err: format!(
563 "column {} already exists with different type {:?}",
564 col_to_add.column_schema.name, column_schema.data_type,
565 ),
566 }
567 );
568 } else {
569 ensure!(
572 names.insert(&col_to_add.column_schema.name),
573 error::InvalidAlterRequestSnafu {
574 table: table_name,
575 err: format!(
576 "add column {} more than once",
577 col_to_add.column_schema.name
578 ),
579 }
580 );
581
582 ensure!(
583 col_to_add.column_schema.is_nullable()
584 || col_to_add.column_schema.default_constraint().is_some(),
585 error::InvalidAlterRequestSnafu {
586 table: table_name,
587 err: format!(
588 "no default value for column {}",
589 col_to_add.column_schema.name
590 ),
591 },
592 );
593
594 new_columns.push(col_to_add.clone());
595 }
596 }
597 let requests = &new_columns[..];
598
599 let SplitResult {
600 columns_at_first,
601 columns_at_after,
602 columns_at_last,
603 column_names,
604 } = self.split_requests_by_column_location(table_name, requests)?;
605 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
606 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
607 columns_at_first.iter().rev().for_each(|request| {
609 if request.is_key {
610 primary_key_indices.push(columns.len());
612 }
613 columns.push(request.column_schema.clone());
614 });
615 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
617 if original_primary_key_indices.contains(&index) {
618 primary_key_indices.push(columns.len());
619 }
620 columns.push(column_schema.clone());
621 if let Some(requests) = columns_at_after.get(&column_schema.name) {
622 requests.iter().rev().for_each(|request| {
623 if request.is_key {
624 primary_key_indices.push(columns.len());
626 }
627 columns.push(request.column_schema.clone());
628 });
629 }
630 }
631 columns_at_last.iter().for_each(|request| {
633 if request.is_key {
634 primary_key_indices.push(columns.len());
636 }
637 columns.push(request.column_schema.clone());
638 });
639
640 let mut builder = SchemaBuilder::try_from(columns)
641 .with_context(|_| error::SchemaBuildSnafu {
642 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
643 })?
644 .version(table_schema.version() + 1);
646 for (k, v) in table_schema.metadata().iter() {
647 builder = builder.add_metadata(k, v);
648 }
649 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
650 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
651 })?;
652
653 let partition_key_indices = self
654 .partition_key_indices
655 .iter()
656 .map(|idx| table_schema.column_name_by_index(*idx))
657 .map(|name| new_schema.column_index_by_name(name).unwrap())
659 .collect();
660
661 let _ = meta_builder
663 .schema(Arc::new(new_schema))
664 .primary_key_indices(primary_key_indices)
665 .partition_key_indices(partition_key_indices);
666
667 Ok(meta_builder)
668 }
669
670 fn remove_columns(
671 &self,
672 table_name: &str,
673 column_names: &[String],
674 ) -> Result<TableMetaBuilder> {
675 let table_schema = &self.schema;
676 let column_names: HashSet<_> = column_names.iter().collect();
677 let mut meta_builder = self.new_meta_builder();
678
679 let timestamp_index = table_schema.timestamp_index();
680 for column_name in &column_names {
682 if let Some(index) = table_schema.column_index_by_name(column_name) {
683 ensure!(
686 !self.primary_key_indices.contains(&index),
687 error::RemoveColumnInIndexSnafu {
688 column_name: *column_name,
689 table_name,
690 }
691 );
692
693 ensure!(
694 !self.partition_key_indices.contains(&index),
695 error::RemovePartitionColumnSnafu {
696 column_name: *column_name,
697 table_name,
698 }
699 );
700
701 if let Some(ts_index) = timestamp_index {
702 ensure!(
704 index != ts_index,
705 error::RemoveColumnInIndexSnafu {
706 column_name: table_schema.column_name_by_index(ts_index),
707 table_name,
708 }
709 );
710 }
711 } else {
712 return error::ColumnNotExistsSnafu {
713 column_name: *column_name,
714 table_name,
715 }
716 .fail()?;
717 }
718 }
719
720 let columns: Vec<_> = table_schema
722 .column_schemas()
723 .iter()
724 .filter(|column_schema| !column_names.contains(&column_schema.name))
725 .cloned()
726 .collect();
727
728 let mut builder = SchemaBuilder::try_from_columns(columns)
729 .with_context(|_| error::SchemaBuildSnafu {
730 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
731 })?
732 .version(table_schema.version() + 1);
734 for (k, v) in table_schema.metadata().iter() {
735 builder = builder.add_metadata(k, v);
736 }
737 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
738 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
739 })?;
740
741 let primary_key_indices = self
743 .primary_key_indices
744 .iter()
745 .map(|idx| table_schema.column_name_by_index(*idx))
746 .map(|name| new_schema.column_index_by_name(name).unwrap())
748 .collect();
749
750 let partition_key_indices = self
751 .partition_key_indices
752 .iter()
753 .map(|idx| table_schema.column_name_by_index(*idx))
754 .map(|name| new_schema.column_index_by_name(name).unwrap())
756 .collect();
757
758 let _ = meta_builder
759 .schema(Arc::new(new_schema))
760 .primary_key_indices(primary_key_indices)
761 .partition_key_indices(partition_key_indices);
762
763 Ok(meta_builder)
764 }
765
766 fn modify_column_types(
767 &self,
768 table_name: &str,
769 requests: &[ModifyColumnTypeRequest],
770 ) -> Result<TableMetaBuilder> {
771 let table_schema = &self.schema;
772 let mut meta_builder = self.new_meta_builder();
773
774 let mut modify_column_types = HashMap::with_capacity(requests.len());
775 let timestamp_index = table_schema.timestamp_index();
776
777 for col_to_change in requests {
778 let change_column_name = &col_to_change.column_name;
779
780 let index = table_schema
781 .column_index_by_name(change_column_name)
782 .with_context(|| error::ColumnNotExistsSnafu {
783 column_name: change_column_name,
784 table_name,
785 })?;
786
787 let column = &table_schema.column_schemas()[index];
788
789 ensure!(
790 !self.primary_key_indices.contains(&index),
791 error::InvalidAlterRequestSnafu {
792 table: table_name,
793 err: format!(
794 "Not allowed to change primary key index column '{}'",
795 column.name
796 )
797 }
798 );
799
800 if let Some(ts_index) = timestamp_index {
801 ensure!(
803 index != ts_index,
804 error::InvalidAlterRequestSnafu {
805 table: table_name,
806 err: format!(
807 "Not allowed to change timestamp index column '{}' datatype",
808 column.name
809 )
810 }
811 );
812 }
813
814 ensure!(
815 modify_column_types
816 .insert(&col_to_change.column_name, col_to_change)
817 .is_none(),
818 error::InvalidAlterRequestSnafu {
819 table: table_name,
820 err: format!(
821 "change column datatype {} more than once",
822 col_to_change.column_name
823 ),
824 }
825 );
826
827 ensure!(
828 column
829 .data_type
830 .can_arrow_type_cast_to(&col_to_change.target_type),
831 error::InvalidAlterRequestSnafu {
832 table: table_name,
833 err: format!(
834 "column '{}' cannot be cast automatically to type '{}'",
835 col_to_change.column_name, col_to_change.target_type,
836 ),
837 }
838 );
839
840 ensure!(
841 column.is_nullable(),
842 error::InvalidAlterRequestSnafu {
843 table: table_name,
844 err: format!(
845 "column '{}' must be nullable to ensure safe conversion.",
846 col_to_change.column_name,
847 ),
848 }
849 );
850 }
851 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
854 for mut column in table_schema.column_schemas().iter().cloned() {
855 if let Some(change_column) = modify_column_types.get(&column.name) {
856 column.data_type = change_column.target_type.clone();
857 let new_default = if let Some(default_value) = column.default_constraint() {
858 Some(
859 default_value
860 .cast_to_datatype(&change_column.target_type)
861 .with_context(|_| error::CastDefaultValueSnafu {
862 reason: format!(
863 "Failed to cast default value from {:?} to type {:?}",
864 default_value, &change_column.target_type
865 ),
866 })?,
867 )
868 } else {
869 None
870 };
871 column = column
872 .clone()
873 .with_default_constraint(new_default.clone())
874 .with_context(|_| error::CastDefaultValueSnafu {
875 reason: format!("Failed to set new default: {:?}", new_default),
876 })?;
877 }
878 columns.push(column)
879 }
880
881 let mut builder = SchemaBuilder::try_from_columns(columns)
882 .with_context(|_| error::SchemaBuildSnafu {
883 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
884 })?
885 .version(table_schema.version() + 1);
887 for (k, v) in table_schema.metadata().iter() {
888 builder = builder.add_metadata(k, v);
889 }
890 let new_schema = builder.build().with_context(|_| {
891 let column_names: Vec<_> = requests
892 .iter()
893 .map(|request| &request.column_name)
894 .collect();
895
896 error::SchemaBuildSnafu {
897 msg: format!(
898 "Table {table_name} cannot change datatype with columns {column_names:?}"
899 ),
900 }
901 })?;
902
903 let _ = meta_builder
904 .schema(Arc::new(new_schema))
905 .primary_key_indices(self.primary_key_indices.clone());
906
907 Ok(meta_builder)
908 }
909
910 fn split_requests_by_column_location<'a>(
912 &self,
913 table_name: &str,
914 requests: &'a [AddColumnRequest],
915 ) -> Result<SplitResult<'a>> {
916 let table_schema = &self.schema;
917 let mut columns_at_first = Vec::new();
918 let mut columns_at_after = HashMap::new();
919 let mut columns_at_last = Vec::new();
920 let mut column_names = Vec::with_capacity(requests.len());
921 for request in requests {
922 let column_name = &request.column_schema.name;
924 column_names.push(column_name.clone());
925 ensure!(
926 table_schema.column_schema_by_name(column_name).is_none(),
927 error::ColumnExistsSnafu {
928 column_name,
929 table_name,
930 }
931 );
932 match request.location.as_ref() {
933 Some(AddColumnLocation::First) => {
934 columns_at_first.push(request);
935 }
936 Some(AddColumnLocation::After { column_name }) => {
937 ensure!(
938 table_schema.column_schema_by_name(column_name).is_some(),
939 error::ColumnNotExistsSnafu {
940 column_name,
941 table_name,
942 }
943 );
944 columns_at_after
945 .entry(column_name.clone())
946 .or_insert(Vec::new())
947 .push(request);
948 }
949 None => {
950 columns_at_last.push(request);
951 }
952 }
953 }
954 Ok(SplitResult {
955 columns_at_first,
956 columns_at_after,
957 columns_at_last,
958 column_names,
959 })
960 }
961
962 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
963 let table_schema = &self.schema;
964 let mut meta_builder = self.new_meta_builder();
965 let mut columns = Vec::with_capacity(table_schema.num_columns());
966 for column_schema in table_schema.column_schemas() {
967 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
968 ensure!(
970 column_schema.default_constraint().is_some(),
971 error::InvalidAlterRequestSnafu {
972 table: table_name,
973 err: format!("column {name} does not have a default value"),
974 }
975 );
976 if !column_schema.is_nullable() {
977 return error::InvalidAlterRequestSnafu {
978 table: table_name,
979 err: format!(
980 "column {name} is not nullable and `default` cannot be dropped",
981 ),
982 }
983 .fail();
984 }
985 let new_column_schema = column_schema.clone();
986 let new_column_schema = new_column_schema
987 .with_default_constraint(None)
988 .with_context(|_| error::SchemaBuildSnafu {
989 msg: format!("Table {table_name} cannot drop default values"),
990 })?;
991 columns.push(new_column_schema);
992 } else {
993 columns.push(column_schema.clone());
994 }
995 }
996
997 let mut builder = SchemaBuilder::try_from_columns(columns)
998 .with_context(|_| error::SchemaBuildSnafu {
999 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1000 })?
1001 .version(table_schema.version() + 1);
1003 for (k, v) in table_schema.metadata().iter() {
1004 builder = builder.add_metadata(k, v);
1005 }
1006 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1007 msg: format!("Table {table_name} cannot drop default values"),
1008 })?;
1009
1010 let _ = meta_builder.schema(Arc::new(new_schema));
1011
1012 Ok(meta_builder)
1013 }
1014
1015 fn set_defaults(
1016 &self,
1017 table_name: &str,
1018 set_defaults: &[SetDefaultRequest],
1019 ) -> Result<TableMetaBuilder> {
1020 let table_schema = &self.schema;
1021 let mut meta_builder = self.new_meta_builder();
1022 let mut columns = Vec::with_capacity(table_schema.num_columns());
1023 for column_schema in table_schema.column_schemas() {
1024 if let Some(set_default) = set_defaults
1025 .iter()
1026 .find(|s| s.column_name == column_schema.name)
1027 {
1028 let new_column_schema = column_schema.clone();
1029 let new_column_schema = new_column_schema
1030 .with_default_constraint(set_default.default_constraint.clone())
1031 .with_context(|_| error::SchemaBuildSnafu {
1032 msg: format!("Table {table_name} cannot set default values"),
1033 })?;
1034 columns.push(new_column_schema);
1035 } else {
1036 columns.push(column_schema.clone());
1037 }
1038 }
1039
1040 let mut builder = SchemaBuilder::try_from_columns(columns)
1041 .with_context(|_| error::SchemaBuildSnafu {
1042 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1043 })?
1044 .version(table_schema.version() + 1);
1046 for (k, v) in table_schema.metadata().iter() {
1047 builder = builder.add_metadata(k, v);
1048 }
1049 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1050 msg: format!("Table {table_name} cannot set default values"),
1051 })?;
1052
1053 let _ = meta_builder.schema(Arc::new(new_schema));
1054
1055 Ok(meta_builder)
1056 }
1057}
1058
1059#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1060#[builder(pattern = "owned")]
1061pub struct TableInfo {
1062 #[builder(default, setter(into))]
1064 pub ident: TableIdent,
1065 #[builder(setter(into))]
1067 pub name: String,
1068 #[builder(default, setter(into))]
1070 pub desc: Option<String>,
1071 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1072 pub catalog_name: String,
1073 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1074 pub schema_name: String,
1075 pub meta: TableMeta,
1076 #[builder(default = "TableType::Base")]
1077 pub table_type: TableType,
1078}
1079
1080pub type TableInfoRef = Arc<TableInfo>;
1081
1082impl TableInfo {
1083 pub fn table_id(&self) -> TableId {
1084 self.ident.table_id
1085 }
1086
1087 pub fn full_table_name(&self) -> String {
1089 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1090 }
1091
1092 pub fn get_db_string(&self) -> String {
1093 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1094 }
1095
1096 pub fn is_physical_table(&self) -> bool {
1098 self.meta
1099 .options
1100 .extra_options
1101 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1102 }
1103
1104 pub fn is_ttl_instant_table(&self) -> bool {
1106 self.meta
1107 .options
1108 .ttl
1109 .map(|t| t.is_instant())
1110 .unwrap_or(false)
1111 }
1112}
1113
1114impl TableInfoBuilder {
1115 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1116 Self {
1117 name: Some(name.into()),
1118 meta: Some(meta),
1119 ..Default::default()
1120 }
1121 }
1122
1123 pub fn table_id(mut self, id: TableId) -> Self {
1124 let ident = self.ident.get_or_insert_with(TableIdent::default);
1125 ident.table_id = id;
1126 self
1127 }
1128
1129 pub fn table_version(mut self, version: TableVersion) -> Self {
1130 let ident = self.ident.get_or_insert_with(TableIdent::default);
1131 ident.version = version;
1132 self
1133 }
1134}
1135
1136impl TableIdent {
1137 pub fn new(table_id: TableId) -> Self {
1138 Self {
1139 table_id,
1140 version: 0,
1141 }
1142 }
1143}
1144
1145impl From<TableId> for TableIdent {
1146 fn from(table_id: TableId) -> Self {
1147 Self::new(table_id)
1148 }
1149}
1150
1151#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1153pub struct RawTableMeta {
1154 pub schema: RawSchema,
1155 pub primary_key_indices: Vec<usize>,
1158 pub value_indices: Vec<usize>,
1161 pub engine: String,
1163 pub next_column_id: ColumnId,
1166 pub options: TableOptions,
1167 pub created_on: DateTime<Utc>,
1168 pub updated_on: DateTime<Utc>,
1169 #[serde(default)]
1171 pub partition_key_indices: Vec<usize>,
1172 #[serde(default)]
1175 pub column_ids: Vec<ColumnId>,
1176}
1177
1178impl<'de> Deserialize<'de> for RawTableMeta {
1179 fn deserialize<D>(
1180 deserializer: D,
1181 ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1182 where
1183 D: Deserializer<'de>,
1184 {
1185 #[derive(Deserialize)]
1186 struct Helper {
1187 schema: RawSchema,
1188 primary_key_indices: Vec<usize>,
1189 value_indices: Vec<usize>,
1190 engine: String,
1191 next_column_id: u32,
1192 options: TableOptions,
1193 created_on: DateTime<Utc>,
1194 updated_on: Option<DateTime<Utc>>,
1195 #[serde(default)]
1196 partition_key_indices: Vec<usize>,
1197 #[serde(default)]
1198 column_ids: Vec<ColumnId>,
1199 }
1200
1201 let h = Helper::deserialize(deserializer)?;
1202 Ok(RawTableMeta {
1203 schema: h.schema,
1204 primary_key_indices: h.primary_key_indices,
1205 value_indices: h.value_indices,
1206 engine: h.engine,
1207 next_column_id: h.next_column_id,
1208 options: h.options,
1209 created_on: h.created_on,
1210 updated_on: h.updated_on.unwrap_or(h.created_on),
1211 partition_key_indices: h.partition_key_indices,
1212 column_ids: h.column_ids,
1213 })
1214 }
1215}
1216
1217impl From<TableMeta> for RawTableMeta {
1218 fn from(meta: TableMeta) -> RawTableMeta {
1219 RawTableMeta {
1220 schema: RawSchema::from(&*meta.schema),
1221 primary_key_indices: meta.primary_key_indices,
1222 value_indices: meta.value_indices,
1223 engine: meta.engine,
1224 next_column_id: meta.next_column_id,
1225 options: meta.options,
1226 created_on: meta.created_on,
1227 updated_on: meta.updated_on,
1228 partition_key_indices: meta.partition_key_indices,
1229 column_ids: meta.column_ids,
1230 }
1231 }
1232}
1233
1234impl TryFrom<RawTableMeta> for TableMeta {
1235 type Error = ConvertError;
1236
1237 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1238 Ok(TableMeta {
1239 schema: Arc::new(Schema::try_from(raw.schema)?),
1240 primary_key_indices: raw.primary_key_indices,
1241 value_indices: raw.value_indices,
1242 engine: raw.engine,
1243 next_column_id: raw.next_column_id,
1244 options: raw.options,
1245 created_on: raw.created_on,
1246 updated_on: raw.updated_on,
1247 partition_key_indices: raw.partition_key_indices,
1248 column_ids: raw.column_ids,
1249 })
1250 }
1251}
1252
1253#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1255pub struct RawTableInfo {
1256 pub ident: TableIdent,
1257 pub name: String,
1258 pub desc: Option<String>,
1259 pub catalog_name: String,
1260 pub schema_name: String,
1261 pub meta: RawTableMeta,
1262 pub table_type: TableType,
1263}
1264
1265impl RawTableInfo {
1266 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1270 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1271 None
1272 } else {
1273 Some(
1274 self.meta
1275 .column_ids
1276 .iter()
1277 .enumerate()
1278 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1279 .collect(),
1280 )
1281 }
1282 }
1283
1284 pub fn sort_columns(&mut self) {
1286 let column_schemas = &self.meta.schema.column_schemas;
1287 let primary_keys = self
1288 .meta
1289 .primary_key_indices
1290 .iter()
1291 .map(|index| column_schemas[*index].name.clone())
1292 .collect::<HashSet<_>>();
1293
1294 let name_to_ids = self.name_to_ids().unwrap_or_default();
1295 self.meta
1296 .schema
1297 .column_schemas
1298 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1299
1300 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1302 let mut timestamp_index = None;
1303 let mut value_indices =
1304 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1305 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1306 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1307 if primary_keys.contains(&column_schema.name) {
1308 primary_key_indices.push(index);
1309 } else if column_schema.is_time_index() {
1310 value_indices.push(index);
1311 timestamp_index = Some(index);
1312 } else {
1313 value_indices.push(index);
1314 }
1315 if let Some(id) = name_to_ids.get(&column_schema.name) {
1316 column_ids.push(*id);
1317 }
1318 }
1319
1320 self.meta.schema.timestamp_index = timestamp_index;
1322 self.meta.primary_key_indices = primary_key_indices;
1323 self.meta.value_indices = value_indices;
1324 self.meta.column_ids = column_ids;
1325 }
1326
1327 pub fn to_region_options(&self) -> HashMap<String, String> {
1331 HashMap::from(&self.meta.options)
1332 }
1333
1334 pub fn table_ref(&self) -> TableReference<'_> {
1336 TableReference::full(
1337 self.catalog_name.as_str(),
1338 self.schema_name.as_str(),
1339 self.name.as_str(),
1340 )
1341 }
1342}
1343
1344impl From<TableInfo> for RawTableInfo {
1345 fn from(info: TableInfo) -> RawTableInfo {
1346 RawTableInfo {
1347 ident: info.ident,
1348 name: info.name,
1349 desc: info.desc,
1350 catalog_name: info.catalog_name,
1351 schema_name: info.schema_name,
1352 meta: RawTableMeta::from(info.meta),
1353 table_type: info.table_type,
1354 }
1355 }
1356}
1357
1358impl TryFrom<RawTableInfo> for TableInfo {
1359 type Error = ConvertError;
1360
1361 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1362 Ok(TableInfo {
1363 ident: raw.ident,
1364 name: raw.name,
1365 desc: raw.desc,
1366 catalog_name: raw.catalog_name,
1367 schema_name: raw.schema_name,
1368 meta: TableMeta::try_from(raw.meta)?,
1369 table_type: raw.table_type,
1370 })
1371 }
1372}
1373
1374fn set_column_fulltext_options(
1383 column_schema: &mut ColumnSchema,
1384 column_name: &str,
1385 options: &FulltextOptions,
1386 current_options: Option<FulltextOptions>,
1387) -> Result<()> {
1388 if let Some(current_options) = current_options {
1389 ensure!(
1390 current_options.analyzer == options.analyzer
1391 && current_options.case_sensitive == options.case_sensitive,
1392 error::InvalidColumnOptionSnafu {
1393 column_name,
1394 msg: format!(
1395 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1396 current_options.analyzer, current_options.case_sensitive
1397 ),
1398 }
1399 );
1400 }
1401
1402 column_schema
1403 .set_fulltext_options(options)
1404 .context(error::SetFulltextOptionsSnafu { column_name })?;
1405
1406 Ok(())
1407}
1408
1409fn unset_column_fulltext_options(
1410 column_schema: &mut ColumnSchema,
1411 column_name: &str,
1412 current_options: Option<FulltextOptions>,
1413) -> Result<()> {
1414 ensure!(
1415 current_options
1416 .as_ref()
1417 .is_some_and(|options| options.enable),
1418 error::InvalidColumnOptionSnafu {
1419 column_name,
1420 msg: "FULLTEXT index already disabled".to_string(),
1421 }
1422 );
1423
1424 let mut options = current_options.unwrap();
1425 options.enable = false;
1426 column_schema
1427 .set_fulltext_options(&options)
1428 .context(error::SetFulltextOptionsSnafu { column_name })?;
1429
1430 Ok(())
1431}
1432
1433fn set_column_skipping_index_options(
1434 column_schema: &mut ColumnSchema,
1435 column_name: &str,
1436 options: &SkippingIndexOptions,
1437) -> Result<()> {
1438 column_schema
1439 .set_skipping_options(options)
1440 .context(error::SetSkippingOptionsSnafu { column_name })?;
1441
1442 Ok(())
1443}
1444
1445fn unset_column_skipping_index_options(
1446 column_schema: &mut ColumnSchema,
1447 column_name: &str,
1448) -> Result<()> {
1449 column_schema
1450 .unset_skipping_options()
1451 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1452 Ok(())
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457 use std::assert_matches::assert_matches;
1458
1459 use common_error::ext::ErrorExt;
1460 use common_error::status_code::StatusCode;
1461 use datatypes::data_type::ConcreteDataType;
1462 use datatypes::schema::{
1463 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1464 };
1465
1466 use super::*;
1467 use crate::Error;
1468
1469 fn new_test_schema() -> Schema {
1471 let column_schemas = vec![
1472 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1473 ColumnSchema::new(
1474 "ts",
1475 ConcreteDataType::timestamp_millisecond_datatype(),
1476 false,
1477 )
1478 .with_time_index(true),
1479 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1480 ];
1481 SchemaBuilder::try_from(column_schemas)
1482 .unwrap()
1483 .version(123)
1484 .build()
1485 .unwrap()
1486 }
1487
1488 #[test]
1489 fn test_raw_convert() {
1490 let schema = Arc::new(new_test_schema());
1491 let meta = TableMetaBuilder::empty()
1492 .schema(schema)
1493 .primary_key_indices(vec![0])
1494 .engine("engine")
1495 .next_column_id(3)
1496 .build()
1497 .unwrap();
1498 let info = TableInfoBuilder::default()
1499 .table_id(10)
1500 .table_version(5)
1501 .name("mytable")
1502 .meta(meta)
1503 .build()
1504 .unwrap();
1505
1506 let raw = RawTableInfo::from(info.clone());
1507 let info_new = TableInfo::try_from(raw).unwrap();
1508
1509 assert_eq!(info, info_new);
1510 }
1511
1512 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1513 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1514 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1515 let alter_kind = AlterKind::AddColumns {
1516 columns: vec![
1517 AddColumnRequest {
1518 column_schema: new_tag,
1519 is_key: true,
1520 location: None,
1521 add_if_not_exists: false,
1522 },
1523 AddColumnRequest {
1524 column_schema: new_field,
1525 is_key: false,
1526 location: None,
1527 add_if_not_exists: false,
1528 },
1529 ],
1530 };
1531
1532 let builder = meta
1533 .builder_with_alter_kind("my_table", &alter_kind)
1534 .unwrap();
1535 builder.build().unwrap()
1536 }
1537
1538 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1539 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1540 let new_field = ColumnSchema::new(
1541 "my_field_after_ts",
1542 ConcreteDataType::string_datatype(),
1543 true,
1544 );
1545 let yet_another_field = ColumnSchema::new(
1546 "yet_another_field_after_ts",
1547 ConcreteDataType::int64_datatype(),
1548 true,
1549 );
1550 let alter_kind = AlterKind::AddColumns {
1551 columns: vec![
1552 AddColumnRequest {
1553 column_schema: new_tag,
1554 is_key: true,
1555 location: Some(AddColumnLocation::First),
1556 add_if_not_exists: false,
1557 },
1558 AddColumnRequest {
1559 column_schema: new_field,
1560 is_key: false,
1561 location: Some(AddColumnLocation::After {
1562 column_name: "ts".to_string(),
1563 }),
1564 add_if_not_exists: false,
1565 },
1566 AddColumnRequest {
1567 column_schema: yet_another_field,
1568 is_key: true,
1569 location: Some(AddColumnLocation::After {
1570 column_name: "ts".to_string(),
1571 }),
1572 add_if_not_exists: false,
1573 },
1574 ],
1575 };
1576
1577 let builder = meta
1578 .builder_with_alter_kind("my_table", &alter_kind)
1579 .unwrap();
1580 builder.build().unwrap()
1581 }
1582
1583 #[test]
1584 fn test_add_columns() {
1585 let schema = Arc::new(new_test_schema());
1586 let meta = TableMetaBuilder::empty()
1587 .schema(schema)
1588 .primary_key_indices(vec![0])
1589 .engine("engine")
1590 .next_column_id(3)
1591 .build()
1592 .unwrap();
1593
1594 let new_meta = add_columns_to_meta(&meta);
1595 let names: Vec<String> = new_meta
1596 .schema
1597 .column_schemas()
1598 .iter()
1599 .map(|column_schema| column_schema.name.clone())
1600 .collect();
1601 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1602 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1603 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1604 }
1605
1606 #[test]
1607 fn test_add_columns_multiple_times() {
1608 let schema = Arc::new(new_test_schema());
1609 let meta = TableMetaBuilder::empty()
1610 .schema(schema)
1611 .primary_key_indices(vec![0])
1612 .engine("engine")
1613 .next_column_id(3)
1614 .build()
1615 .unwrap();
1616
1617 let alter_kind = AlterKind::AddColumns {
1618 columns: vec![
1619 AddColumnRequest {
1620 column_schema: ColumnSchema::new(
1621 "col3",
1622 ConcreteDataType::int32_datatype(),
1623 true,
1624 ),
1625 is_key: true,
1626 location: None,
1627 add_if_not_exists: true,
1628 },
1629 AddColumnRequest {
1630 column_schema: ColumnSchema::new(
1631 "col3",
1632 ConcreteDataType::int32_datatype(),
1633 true,
1634 ),
1635 is_key: true,
1636 location: None,
1637 add_if_not_exists: true,
1638 },
1639 ],
1640 };
1641 let err = meta
1642 .builder_with_alter_kind("my_table", &alter_kind)
1643 .err()
1644 .unwrap();
1645 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1646 }
1647
1648 #[test]
1649 fn test_remove_columns() {
1650 let schema = Arc::new(new_test_schema());
1651 let meta = TableMetaBuilder::empty()
1652 .schema(schema.clone())
1653 .primary_key_indices(vec![0])
1654 .engine("engine")
1655 .next_column_id(3)
1656 .build()
1657 .unwrap();
1658 let meta = add_columns_to_meta(&meta);
1660
1661 let alter_kind = AlterKind::DropColumns {
1662 names: vec![String::from("col2"), String::from("my_field")],
1663 };
1664 let new_meta = meta
1665 .builder_with_alter_kind("my_table", &alter_kind)
1666 .unwrap()
1667 .build()
1668 .unwrap();
1669
1670 let names: Vec<String> = new_meta
1671 .schema
1672 .column_schemas()
1673 .iter()
1674 .map(|column_schema| column_schema.name.clone())
1675 .collect();
1676 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1677 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1678 assert_eq!(&[1], &new_meta.value_indices[..]);
1679 assert_eq!(
1680 schema.timestamp_column(),
1681 new_meta.schema.timestamp_column()
1682 );
1683 }
1684
1685 #[test]
1686 fn test_remove_multiple_columns_before_timestamp() {
1687 let column_schemas = vec![
1688 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1689 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1690 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1691 ColumnSchema::new(
1692 "ts",
1693 ConcreteDataType::timestamp_millisecond_datatype(),
1694 false,
1695 )
1696 .with_time_index(true),
1697 ];
1698 let schema = Arc::new(
1699 SchemaBuilder::try_from(column_schemas)
1700 .unwrap()
1701 .version(123)
1702 .build()
1703 .unwrap(),
1704 );
1705 let meta = TableMetaBuilder::empty()
1706 .schema(schema.clone())
1707 .primary_key_indices(vec![1])
1708 .engine("engine")
1709 .next_column_id(4)
1710 .build()
1711 .unwrap();
1712
1713 let alter_kind = AlterKind::DropColumns {
1715 names: vec![String::from("col3"), String::from("col1")],
1716 };
1717 let new_meta = meta
1718 .builder_with_alter_kind("my_table", &alter_kind)
1719 .unwrap()
1720 .build()
1721 .unwrap();
1722
1723 let names: Vec<String> = new_meta
1724 .schema
1725 .column_schemas()
1726 .iter()
1727 .map(|column_schema| column_schema.name.clone())
1728 .collect();
1729 assert_eq!(&["col2", "ts"], &names[..]);
1730 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1731 assert_eq!(&[1], &new_meta.value_indices[..]);
1732 assert_eq!(
1733 schema.timestamp_column(),
1734 new_meta.schema.timestamp_column()
1735 );
1736 }
1737
1738 #[test]
1739 fn test_add_existing_column() {
1740 let schema = Arc::new(new_test_schema());
1741 let meta = TableMetaBuilder::empty()
1742 .schema(schema)
1743 .primary_key_indices(vec![0])
1744 .engine("engine")
1745 .next_column_id(3)
1746 .build()
1747 .unwrap();
1748
1749 let alter_kind = AlterKind::AddColumns {
1750 columns: vec![AddColumnRequest {
1751 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1752 is_key: false,
1753 location: None,
1754 add_if_not_exists: false,
1755 }],
1756 };
1757
1758 let err = meta
1759 .builder_with_alter_kind("my_table", &alter_kind)
1760 .err()
1761 .unwrap();
1762 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1763
1764 let alter_kind = AlterKind::AddColumns {
1766 columns: vec![AddColumnRequest {
1767 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1768 is_key: true,
1769 location: None,
1770 add_if_not_exists: true,
1771 }],
1772 };
1773 let new_meta = meta
1774 .builder_with_alter_kind("my_table", &alter_kind)
1775 .unwrap()
1776 .build()
1777 .unwrap();
1778 assert_eq!(
1779 meta.schema.column_schemas(),
1780 new_meta.schema.column_schemas()
1781 );
1782 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1783 }
1784
1785 #[test]
1786 fn test_add_different_type_column() {
1787 let schema = Arc::new(new_test_schema());
1788 let meta = TableMetaBuilder::empty()
1789 .schema(schema)
1790 .primary_key_indices(vec![0])
1791 .engine("engine")
1792 .next_column_id(3)
1793 .build()
1794 .unwrap();
1795
1796 let alter_kind = AlterKind::AddColumns {
1798 columns: vec![AddColumnRequest {
1799 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1800 is_key: false,
1801 location: None,
1802 add_if_not_exists: true,
1803 }],
1804 };
1805 let err = meta
1806 .builder_with_alter_kind("my_table", &alter_kind)
1807 .err()
1808 .unwrap();
1809 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1810 }
1811
1812 #[test]
1813 fn test_add_invalid_column() {
1814 let schema = Arc::new(new_test_schema());
1815 let meta = TableMetaBuilder::empty()
1816 .schema(schema)
1817 .primary_key_indices(vec![0])
1818 .engine("engine")
1819 .next_column_id(3)
1820 .build()
1821 .unwrap();
1822
1823 let alter_kind = AlterKind::AddColumns {
1825 columns: vec![AddColumnRequest {
1826 column_schema: ColumnSchema::new(
1827 "weny",
1828 ConcreteDataType::string_datatype(),
1829 false,
1830 ),
1831 is_key: false,
1832 location: None,
1833 add_if_not_exists: false,
1834 }],
1835 };
1836
1837 let err = meta
1838 .builder_with_alter_kind("my_table", &alter_kind)
1839 .err()
1840 .unwrap();
1841 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1842 }
1843
1844 #[test]
1845 fn test_remove_unknown_column() {
1846 let schema = Arc::new(new_test_schema());
1847 let meta = TableMetaBuilder::empty()
1848 .schema(schema)
1849 .primary_key_indices(vec![0])
1850 .engine("engine")
1851 .next_column_id(3)
1852 .build()
1853 .unwrap();
1854
1855 let alter_kind = AlterKind::DropColumns {
1856 names: vec![String::from("unknown")],
1857 };
1858
1859 let err = meta
1860 .builder_with_alter_kind("my_table", &alter_kind)
1861 .err()
1862 .unwrap();
1863 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1864 }
1865
1866 #[test]
1867 fn test_change_unknown_column_data_type() {
1868 let schema = Arc::new(new_test_schema());
1869 let meta = TableMetaBuilder::empty()
1870 .schema(schema)
1871 .primary_key_indices(vec![0])
1872 .engine("engine")
1873 .next_column_id(3)
1874 .build()
1875 .unwrap();
1876
1877 let alter_kind = AlterKind::ModifyColumnTypes {
1878 columns: vec![ModifyColumnTypeRequest {
1879 column_name: "unknown".to_string(),
1880 target_type: ConcreteDataType::string_datatype(),
1881 }],
1882 };
1883
1884 let err = meta
1885 .builder_with_alter_kind("my_table", &alter_kind)
1886 .err()
1887 .unwrap();
1888 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1889 }
1890
1891 #[test]
1892 fn test_remove_key_column() {
1893 let schema = Arc::new(new_test_schema());
1894 let meta = TableMetaBuilder::empty()
1895 .schema(schema)
1896 .primary_key_indices(vec![0])
1897 .engine("engine")
1898 .next_column_id(3)
1899 .build()
1900 .unwrap();
1901
1902 let alter_kind = AlterKind::DropColumns {
1904 names: vec![String::from("col1")],
1905 };
1906
1907 let err = meta
1908 .builder_with_alter_kind("my_table", &alter_kind)
1909 .err()
1910 .unwrap();
1911 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1912
1913 let alter_kind = AlterKind::DropColumns {
1915 names: vec![String::from("ts")],
1916 };
1917
1918 let err = meta
1919 .builder_with_alter_kind("my_table", &alter_kind)
1920 .err()
1921 .unwrap();
1922 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1923 }
1924
1925 #[test]
1926 fn test_remove_partition_column() {
1927 let schema = Arc::new(new_test_schema());
1928 let meta = TableMetaBuilder::empty()
1929 .schema(schema)
1930 .primary_key_indices(vec![])
1931 .partition_key_indices(vec![0])
1932 .engine("engine")
1933 .next_column_id(3)
1934 .build()
1935 .unwrap();
1936 let alter_kind = AlterKind::DropColumns {
1938 names: vec![String::from("col1")],
1939 };
1940
1941 let err = meta
1942 .builder_with_alter_kind("my_table", &alter_kind)
1943 .err()
1944 .unwrap();
1945 assert_matches!(err, Error::RemovePartitionColumn { .. });
1946 }
1947
1948 #[test]
1949 fn test_change_key_column_data_type() {
1950 let schema = Arc::new(new_test_schema());
1951 let meta = TableMetaBuilder::empty()
1952 .schema(schema)
1953 .primary_key_indices(vec![0])
1954 .engine("engine")
1955 .next_column_id(3)
1956 .build()
1957 .unwrap();
1958
1959 let alter_kind = AlterKind::ModifyColumnTypes {
1961 columns: vec![ModifyColumnTypeRequest {
1962 column_name: "col1".to_string(),
1963 target_type: ConcreteDataType::string_datatype(),
1964 }],
1965 };
1966
1967 let err = meta
1968 .builder_with_alter_kind("my_table", &alter_kind)
1969 .err()
1970 .unwrap();
1971 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1972
1973 let alter_kind = AlterKind::ModifyColumnTypes {
1975 columns: vec![ModifyColumnTypeRequest {
1976 column_name: "ts".to_string(),
1977 target_type: ConcreteDataType::string_datatype(),
1978 }],
1979 };
1980
1981 let err = meta
1982 .builder_with_alter_kind("my_table", &alter_kind)
1983 .err()
1984 .unwrap();
1985 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1986 }
1987
1988 #[test]
1989 fn test_alloc_new_column() {
1990 let schema = Arc::new(new_test_schema());
1991 let mut meta = TableMetaBuilder::empty()
1992 .schema(schema)
1993 .primary_key_indices(vec![0])
1994 .engine("engine")
1995 .next_column_id(3)
1996 .build()
1997 .unwrap();
1998 assert_eq!(3, meta.next_column_id);
1999
2000 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2001 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2002
2003 assert_eq!(4, meta.next_column_id);
2004 assert_eq!(column_schema.name, desc.name);
2005 }
2006
2007 #[test]
2008 fn test_add_columns_with_location() {
2009 let schema = Arc::new(new_test_schema());
2010 let meta = TableMetaBuilder::empty()
2011 .schema(schema)
2012 .primary_key_indices(vec![0])
2013 .partition_key_indices(vec![0, 2])
2015 .engine("engine")
2016 .next_column_id(3)
2017 .build()
2018 .unwrap();
2019
2020 let new_meta = add_columns_to_meta_with_location(&meta);
2021 let names: Vec<String> = new_meta
2022 .schema
2023 .column_schemas()
2024 .iter()
2025 .map(|column_schema| column_schema.name.clone())
2026 .collect();
2027 assert_eq!(
2028 &[
2029 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
2036 &names[..]
2037 );
2038 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2039 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2040 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2041 }
2042
2043 #[test]
2044 fn test_modify_column_fulltext_options() {
2045 let schema = Arc::new(new_test_schema());
2046 let meta = TableMetaBuilder::empty()
2047 .schema(schema)
2048 .primary_key_indices(vec![0])
2049 .engine("engine")
2050 .next_column_id(3)
2051 .build()
2052 .unwrap();
2053
2054 let alter_kind = AlterKind::SetIndexes {
2055 options: vec![SetIndexOption::Fulltext {
2056 column_name: "col1".to_string(),
2057 options: FulltextOptions::default(),
2058 }],
2059 };
2060 let err = meta
2061 .builder_with_alter_kind("my_table", &alter_kind)
2062 .err()
2063 .unwrap();
2064 assert_eq!(
2065 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2066 err.to_string()
2067 );
2068
2069 let new_meta = add_columns_to_meta_with_location(&meta);
2071 let alter_kind = AlterKind::SetIndexes {
2072 options: vec![SetIndexOption::Fulltext {
2073 column_name: "my_tag_first".to_string(),
2074 options: FulltextOptions::new_unchecked(
2075 true,
2076 FulltextAnalyzer::Chinese,
2077 true,
2078 FulltextBackend::Bloom,
2079 1000,
2080 0.01,
2081 ),
2082 }],
2083 };
2084 let new_meta = new_meta
2085 .builder_with_alter_kind("my_table", &alter_kind)
2086 .unwrap()
2087 .build()
2088 .unwrap();
2089 let column_schema = new_meta
2090 .schema
2091 .column_schema_by_name("my_tag_first")
2092 .unwrap();
2093 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2094 assert!(fulltext_options.enable);
2095 assert_eq!(
2096 datatypes::schema::FulltextAnalyzer::Chinese,
2097 fulltext_options.analyzer
2098 );
2099 assert!(fulltext_options.case_sensitive);
2100
2101 let alter_kind = AlterKind::UnsetIndexes {
2102 options: vec![UnsetIndexOption::Fulltext {
2103 column_name: "my_tag_first".to_string(),
2104 }],
2105 };
2106 let new_meta = new_meta
2107 .builder_with_alter_kind("my_table", &alter_kind)
2108 .unwrap()
2109 .build()
2110 .unwrap();
2111 let column_schema = new_meta
2112 .schema
2113 .column_schema_by_name("my_tag_first")
2114 .unwrap();
2115 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2116 assert!(!fulltext_options.enable);
2117 }
2118}