1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105 fn from(t: TableType) -> datafusion::datasource::TableType {
106 match t {
107 TableType::Base => datafusion::datasource::TableType::Base,
108 TableType::View => datafusion::datasource::TableType::View,
109 TableType::Temporary => datafusion::datasource::TableType::Temporary,
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117 pub table_id: TableId,
119 pub version: TableVersion,
122}
123
124#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130 pub schema: SchemaRef,
131 pub primary_key_indices: Vec<usize>,
134 #[builder(default = "self.default_value_indices()?")]
135 pub value_indices: Vec<usize>,
136 #[builder(default, setter(into))]
137 pub engine: String,
138 pub next_column_id: ColumnId,
139 #[builder(default)]
141 pub options: TableOptions,
142 #[builder(default = "Utc::now()")]
143 pub created_on: DateTime<Utc>,
144 #[builder(default = "self.default_updated_on()")]
145 pub updated_on: DateTime<Utc>,
146 #[builder(default = "Vec::new()")]
147 pub partition_key_indices: Vec<usize>,
148 #[builder(default = "Vec::new()")]
149 pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153 #[cfg(any(test, feature = "testing"))]
155 pub fn empty() -> Self {
156 Self {
157 schema: None,
158 primary_key_indices: None,
159 value_indices: None,
160 engine: None,
161 next_column_id: None,
162 options: None,
163 created_on: None,
164 updated_on: None,
165 partition_key_indices: None,
166 column_ids: None,
167 }
168 }
169}
170
171impl TableMetaBuilder {
172 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173 match (&self.primary_key_indices, &self.schema) {
174 (Some(v), Some(schema)) => {
175 let column_schemas = schema.column_schemas();
176 Ok((0..column_schemas.len())
177 .filter(|idx| !v.contains(idx))
178 .collect())
179 }
180 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181 }
182 }
183
184 fn default_updated_on(&self) -> DateTime<Utc> {
185 self.created_on.unwrap_or_default()
186 }
187
188 pub fn new_external_table() -> Self {
189 Self {
190 schema: None,
191 primary_key_indices: Some(Vec::new()),
192 value_indices: Some(Vec::new()),
193 engine: None,
194 next_column_id: Some(0),
195 options: None,
196 created_on: None,
197 updated_on: None,
198 partition_key_indices: None,
199 column_ids: None,
200 }
201 }
202}
203
204struct SplitResult<'a> {
206 columns_at_first: Vec<&'a AddColumnRequest>,
208 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
210 columns_at_last: Vec<&'a AddColumnRequest>,
212 column_names: Vec<String>,
214}
215
216impl TableMeta {
217 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
218 let columns_schemas = &self.schema.column_schemas();
219 self.primary_key_indices
220 .iter()
221 .map(|idx| &columns_schemas[*idx].name)
222 }
223
224 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
225 let columns_schemas = self.schema.column_schemas();
227 let primary_key_indices = &self.primary_key_indices;
228 columns_schemas
229 .iter()
230 .enumerate()
231 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
232 .map(|(_, cs)| &cs.name)
233 }
234
235 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
236 let columns_schemas = &self.schema.column_schemas();
237 self.partition_key_indices
238 .iter()
239 .map(|idx| &columns_schemas[*idx].name)
240 }
241
242 pub fn partition_columns(&self) -> impl Iterator<Item = &ColumnSchema> {
243 self.partition_key_indices
244 .iter()
245 .map(|idx| &self.schema.column_schemas()[*idx])
246 }
247
248 pub fn builder_with_alter_kind(
252 &self,
253 table_name: &str,
254 alter_kind: &AlterKind,
255 ) -> Result<TableMetaBuilder> {
256 let mut builder = match alter_kind {
257 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
258 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
259 AlterKind::ModifyColumnTypes { columns } => {
260 self.modify_column_types(table_name, columns)
261 }
262 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
264 AlterKind::SetTableOptions { options } => self.set_table_options(options),
265 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
266 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
267 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
268 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
269 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
270 }?;
271 let _ = builder.updated_on(Utc::now());
272 Ok(builder)
273 }
274
275 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
277 let mut new_options = self.options.clone();
278
279 for request in requests {
280 match request {
281 SetRegionOption::Ttl(new_ttl) => {
282 new_options.ttl = *new_ttl;
283 }
284 SetRegionOption::Twsc(key, value) => {
285 if !value.is_empty() {
286 new_options.extra_options.insert(key.clone(), value.clone());
287 new_options.extra_options.insert(
289 COMPACTION_TYPE.to_string(),
290 COMPACTION_TYPE_TWCS.to_string(),
291 );
292 } else {
293 new_options.extra_options.remove(key.as_str());
295 }
296 }
297 SetRegionOption::Format(value) => {
298 new_options
299 .extra_options
300 .insert(SST_FORMAT_KEY.to_string(), value.clone());
301 }
302 }
303 }
304 let mut builder = self.new_meta_builder();
305 builder.options(new_options);
306
307 Ok(builder)
308 }
309
310 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
311 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
312 self.set_table_options(&requests)
313 }
314
315 fn set_indexes(
316 &self,
317 table_name: &str,
318 requests: &[SetIndexOption],
319 ) -> Result<TableMetaBuilder> {
320 let table_schema = &self.schema;
321 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
322 for request in requests {
323 let column_name = request.column_name();
324 table_schema
325 .column_index_by_name(column_name)
326 .with_context(|| error::ColumnNotExistsSnafu {
327 column_name,
328 table_name,
329 })?;
330 set_index_options
331 .entry(column_name)
332 .or_default()
333 .push(request);
334 }
335
336 let mut meta_builder = self.new_meta_builder();
337 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
338 for mut column in table_schema.column_schemas().iter().cloned() {
339 if let Some(request) = set_index_options.get(column.name.as_str()) {
340 for request in request {
341 self.set_index(&mut column, request)?;
342 }
343 }
344 columns.push(column);
345 }
346
347 let mut builder = SchemaBuilder::try_from_columns(columns)
348 .with_context(|_| error::SchemaBuildSnafu {
349 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
350 })?
351 .version(table_schema.version() + 1);
352
353 for (k, v) in table_schema.metadata().iter() {
354 builder = builder.add_metadata(k, v);
355 }
356
357 let new_schema = builder.build().with_context(|_| {
358 let column_names = requests
359 .iter()
360 .map(|request| request.column_name())
361 .collect::<Vec<_>>();
362 error::SchemaBuildSnafu {
363 msg: format!(
364 "Table {table_name} cannot set index options with columns {column_names:?}",
365 ),
366 }
367 })?;
368 let _ = meta_builder
369 .schema(Arc::new(new_schema))
370 .primary_key_indices(self.primary_key_indices.clone());
371
372 Ok(meta_builder)
373 }
374
375 fn unset_indexes(
376 &self,
377 table_name: &str,
378 requests: &[UnsetIndexOption],
379 ) -> Result<TableMetaBuilder> {
380 let table_schema = &self.schema;
381 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
382 for request in requests {
383 let column_name = request.column_name();
384 table_schema
385 .column_index_by_name(column_name)
386 .with_context(|| error::ColumnNotExistsSnafu {
387 column_name,
388 table_name,
389 })?;
390 set_index_options
391 .entry(column_name)
392 .or_default()
393 .push(request);
394 }
395
396 let mut meta_builder = self.new_meta_builder();
397 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
398 for mut column in table_schema.column_schemas().iter().cloned() {
399 if let Some(request) = set_index_options.get(column.name.as_str()) {
400 for request in request {
401 self.unset_index(&mut column, request)?;
402 }
403 }
404 columns.push(column);
405 }
406
407 let mut builder = SchemaBuilder::try_from_columns(columns)
408 .with_context(|_| error::SchemaBuildSnafu {
409 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
410 })?
411 .version(table_schema.version() + 1);
412
413 for (k, v) in table_schema.metadata().iter() {
414 builder = builder.add_metadata(k, v);
415 }
416
417 let new_schema = builder.build().with_context(|_| {
418 let column_names = requests
419 .iter()
420 .map(|request| request.column_name())
421 .collect::<Vec<_>>();
422 error::SchemaBuildSnafu {
423 msg: format!(
424 "Table {table_name} cannot set index options with columns {column_names:?}",
425 ),
426 }
427 })?;
428 let _ = meta_builder
429 .schema(Arc::new(new_schema))
430 .primary_key_indices(self.primary_key_indices.clone());
431
432 Ok(meta_builder)
433 }
434
435 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
436 match request {
437 SetIndexOption::Fulltext {
438 column_name,
439 options,
440 } => {
441 ensure!(
442 column_schema.data_type.is_string(),
443 error::InvalidColumnOptionSnafu {
444 column_name,
445 msg: "FULLTEXT index only supports string type",
446 }
447 );
448
449 let current_fulltext_options = column_schema
450 .fulltext_options()
451 .context(error::SetFulltextOptionsSnafu { column_name })?;
452 set_column_fulltext_options(
453 column_schema,
454 column_name,
455 options,
456 current_fulltext_options,
457 )?;
458 }
459 SetIndexOption::Inverted { column_name } => {
460 debug_assert_eq!(column_schema.name, *column_name);
461 column_schema.set_inverted_index(true);
462 }
463 SetIndexOption::Skipping {
464 column_name,
465 options,
466 } => {
467 set_column_skipping_index_options(column_schema, column_name, options)?;
468 }
469 }
470
471 Ok(())
472 }
473
474 fn unset_index(
475 &self,
476 column_schema: &mut ColumnSchema,
477 request: &UnsetIndexOption,
478 ) -> Result<()> {
479 match request {
480 UnsetIndexOption::Fulltext { column_name } => {
481 let current_fulltext_options = column_schema
482 .fulltext_options()
483 .context(error::SetFulltextOptionsSnafu { column_name })?;
484 unset_column_fulltext_options(
485 column_schema,
486 column_name,
487 current_fulltext_options.clone(),
488 )?
489 }
490 UnsetIndexOption::Inverted { .. } => {
491 column_schema.set_inverted_index(false);
492 }
493 UnsetIndexOption::Skipping { column_name } => {
494 unset_column_skipping_index_options(column_schema, column_name)?;
495 }
496 }
497
498 Ok(())
499 }
500
501 pub fn alloc_new_column(
506 &mut self,
507 table_name: &str,
508 new_column: &ColumnSchema,
509 ) -> Result<ColumnDescriptor> {
510 let desc = ColumnDescriptorBuilder::new(
511 self.next_column_id as ColumnId,
512 &new_column.name,
513 new_column.data_type.clone(),
514 )
515 .is_nullable(new_column.is_nullable())
516 .default_constraint(new_column.default_constraint().cloned())
517 .build()
518 .context(error::BuildColumnDescriptorSnafu {
519 table_name,
520 column_name: &new_column.name,
521 })?;
522
523 self.next_column_id += 1;
525
526 Ok(desc)
527 }
528
529 fn new_meta_builder(&self) -> TableMetaBuilder {
531 let mut builder = TableMetaBuilder::from(self);
532 builder.value_indices = None;
534 builder
535 }
536
537 fn add_columns(
539 &self,
540 table_name: &str,
541 requests: &[AddColumnRequest],
542 ) -> Result<TableMetaBuilder> {
543 let table_schema = &self.schema;
544 let mut meta_builder = self.new_meta_builder();
545 let original_primary_key_indices: HashSet<&usize> =
546 self.primary_key_indices.iter().collect();
547
548 let mut names = HashSet::with_capacity(requests.len());
549 let mut new_columns = Vec::with_capacity(requests.len());
550 for col_to_add in requests {
551 if let Some(column_schema) =
552 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
553 {
554 ensure!(
556 col_to_add.add_if_not_exists,
557 error::ColumnExistsSnafu {
558 table_name,
559 column_name: &col_to_add.column_schema.name
560 },
561 );
562
563 ensure!(
565 column_schema.data_type == col_to_add.column_schema.data_type,
566 error::InvalidAlterRequestSnafu {
567 table: table_name,
568 err: format!(
569 "column {} already exists with different type {:?}",
570 col_to_add.column_schema.name, column_schema.data_type,
571 ),
572 }
573 );
574 } else {
575 ensure!(
578 names.insert(&col_to_add.column_schema.name),
579 error::InvalidAlterRequestSnafu {
580 table: table_name,
581 err: format!(
582 "add column {} more than once",
583 col_to_add.column_schema.name
584 ),
585 }
586 );
587
588 ensure!(
589 col_to_add.column_schema.is_nullable()
590 || col_to_add.column_schema.default_constraint().is_some(),
591 error::InvalidAlterRequestSnafu {
592 table: table_name,
593 err: format!(
594 "no default value for column {}",
595 col_to_add.column_schema.name
596 ),
597 },
598 );
599
600 new_columns.push(col_to_add.clone());
601 }
602 }
603 let requests = &new_columns[..];
604
605 let SplitResult {
606 columns_at_first,
607 columns_at_after,
608 columns_at_last,
609 column_names,
610 } = self.split_requests_by_column_location(table_name, requests)?;
611 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
612 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
613 columns_at_first.iter().rev().for_each(|request| {
615 if request.is_key {
616 primary_key_indices.push(columns.len());
618 }
619 columns.push(request.column_schema.clone());
620 });
621 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
623 if original_primary_key_indices.contains(&index) {
624 primary_key_indices.push(columns.len());
625 }
626 columns.push(column_schema.clone());
627 if let Some(requests) = columns_at_after.get(&column_schema.name) {
628 requests.iter().rev().for_each(|request| {
629 if request.is_key {
630 primary_key_indices.push(columns.len());
632 }
633 columns.push(request.column_schema.clone());
634 });
635 }
636 }
637 columns_at_last.iter().for_each(|request| {
639 if request.is_key {
640 primary_key_indices.push(columns.len());
642 }
643 columns.push(request.column_schema.clone());
644 });
645
646 let mut builder = SchemaBuilder::try_from(columns)
647 .with_context(|_| error::SchemaBuildSnafu {
648 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
649 })?
650 .version(table_schema.version() + 1);
652 for (k, v) in table_schema.metadata().iter() {
653 builder = builder.add_metadata(k, v);
654 }
655 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
656 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
657 })?;
658
659 let partition_key_indices = self
660 .partition_key_indices
661 .iter()
662 .map(|idx| table_schema.column_name_by_index(*idx))
663 .map(|name| new_schema.column_index_by_name(name).unwrap())
665 .collect();
666
667 let _ = meta_builder
669 .schema(Arc::new(new_schema))
670 .primary_key_indices(primary_key_indices)
671 .partition_key_indices(partition_key_indices);
672
673 Ok(meta_builder)
674 }
675
676 fn remove_columns(
677 &self,
678 table_name: &str,
679 column_names: &[String],
680 ) -> Result<TableMetaBuilder> {
681 let table_schema = &self.schema;
682 let column_names: HashSet<_> = column_names.iter().collect();
683 let mut meta_builder = self.new_meta_builder();
684
685 let timestamp_index = table_schema.timestamp_index();
686 for column_name in &column_names {
688 if let Some(index) = table_schema.column_index_by_name(column_name) {
689 ensure!(
692 !self.primary_key_indices.contains(&index),
693 error::RemoveColumnInIndexSnafu {
694 column_name: *column_name,
695 table_name,
696 }
697 );
698
699 ensure!(
700 !self.partition_key_indices.contains(&index),
701 error::RemovePartitionColumnSnafu {
702 column_name: *column_name,
703 table_name,
704 }
705 );
706
707 if let Some(ts_index) = timestamp_index {
708 ensure!(
710 index != ts_index,
711 error::RemoveColumnInIndexSnafu {
712 column_name: table_schema.column_name_by_index(ts_index),
713 table_name,
714 }
715 );
716 }
717 } else {
718 return error::ColumnNotExistsSnafu {
719 column_name: *column_name,
720 table_name,
721 }
722 .fail()?;
723 }
724 }
725
726 let columns: Vec<_> = table_schema
728 .column_schemas()
729 .iter()
730 .filter(|column_schema| !column_names.contains(&column_schema.name))
731 .cloned()
732 .collect();
733
734 let mut builder = SchemaBuilder::try_from_columns(columns)
735 .with_context(|_| error::SchemaBuildSnafu {
736 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
737 })?
738 .version(table_schema.version() + 1);
740 for (k, v) in table_schema.metadata().iter() {
741 builder = builder.add_metadata(k, v);
742 }
743 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
744 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
745 })?;
746
747 let primary_key_indices = self
749 .primary_key_indices
750 .iter()
751 .map(|idx| table_schema.column_name_by_index(*idx))
752 .map(|name| new_schema.column_index_by_name(name).unwrap())
754 .collect();
755
756 let partition_key_indices = self
757 .partition_key_indices
758 .iter()
759 .map(|idx| table_schema.column_name_by_index(*idx))
760 .map(|name| new_schema.column_index_by_name(name).unwrap())
762 .collect();
763
764 let _ = meta_builder
765 .schema(Arc::new(new_schema))
766 .primary_key_indices(primary_key_indices)
767 .partition_key_indices(partition_key_indices);
768
769 Ok(meta_builder)
770 }
771
772 fn modify_column_types(
773 &self,
774 table_name: &str,
775 requests: &[ModifyColumnTypeRequest],
776 ) -> Result<TableMetaBuilder> {
777 let table_schema = &self.schema;
778 let mut meta_builder = self.new_meta_builder();
779
780 let mut modify_column_types = HashMap::with_capacity(requests.len());
781 let timestamp_index = table_schema.timestamp_index();
782
783 for col_to_change in requests {
784 let change_column_name = &col_to_change.column_name;
785
786 let index = table_schema
787 .column_index_by_name(change_column_name)
788 .with_context(|| error::ColumnNotExistsSnafu {
789 column_name: change_column_name,
790 table_name,
791 })?;
792
793 let column = &table_schema.column_schemas()[index];
794
795 ensure!(
796 !self.primary_key_indices.contains(&index),
797 error::InvalidAlterRequestSnafu {
798 table: table_name,
799 err: format!(
800 "Not allowed to change primary key index column '{}'",
801 column.name
802 )
803 }
804 );
805
806 if let Some(ts_index) = timestamp_index {
807 ensure!(
809 index != ts_index,
810 error::InvalidAlterRequestSnafu {
811 table: table_name,
812 err: format!(
813 "Not allowed to change timestamp index column '{}' datatype",
814 column.name
815 )
816 }
817 );
818 }
819
820 ensure!(
821 modify_column_types
822 .insert(&col_to_change.column_name, col_to_change)
823 .is_none(),
824 error::InvalidAlterRequestSnafu {
825 table: table_name,
826 err: format!(
827 "change column datatype {} more than once",
828 col_to_change.column_name
829 ),
830 }
831 );
832
833 ensure!(
834 column
835 .data_type
836 .can_arrow_type_cast_to(&col_to_change.target_type),
837 error::InvalidAlterRequestSnafu {
838 table: table_name,
839 err: format!(
840 "column '{}' cannot be cast automatically to type '{}'",
841 col_to_change.column_name, col_to_change.target_type,
842 ),
843 }
844 );
845
846 ensure!(
847 column.is_nullable(),
848 error::InvalidAlterRequestSnafu {
849 table: table_name,
850 err: format!(
851 "column '{}' must be nullable to ensure safe conversion.",
852 col_to_change.column_name,
853 ),
854 }
855 );
856 }
857 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
860 for mut column in table_schema.column_schemas().iter().cloned() {
861 if let Some(change_column) = modify_column_types.get(&column.name) {
862 column.data_type = change_column.target_type.clone();
863 let new_default = if let Some(default_value) = column.default_constraint() {
864 Some(
865 default_value
866 .cast_to_datatype(&change_column.target_type)
867 .with_context(|_| error::CastDefaultValueSnafu {
868 reason: format!(
869 "Failed to cast default value from {:?} to type {:?}",
870 default_value, &change_column.target_type
871 ),
872 })?,
873 )
874 } else {
875 None
876 };
877 column = column
878 .clone()
879 .with_default_constraint(new_default.clone())
880 .with_context(|_| error::CastDefaultValueSnafu {
881 reason: format!("Failed to set new default: {:?}", new_default),
882 })?;
883 }
884 columns.push(column)
885 }
886
887 let mut builder = SchemaBuilder::try_from_columns(columns)
888 .with_context(|_| error::SchemaBuildSnafu {
889 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
890 })?
891 .version(table_schema.version() + 1);
893 for (k, v) in table_schema.metadata().iter() {
894 builder = builder.add_metadata(k, v);
895 }
896 let new_schema = builder.build().with_context(|_| {
897 let column_names: Vec<_> = requests
898 .iter()
899 .map(|request| &request.column_name)
900 .collect();
901
902 error::SchemaBuildSnafu {
903 msg: format!(
904 "Table {table_name} cannot change datatype with columns {column_names:?}"
905 ),
906 }
907 })?;
908
909 let _ = meta_builder
910 .schema(Arc::new(new_schema))
911 .primary_key_indices(self.primary_key_indices.clone());
912
913 Ok(meta_builder)
914 }
915
916 fn split_requests_by_column_location<'a>(
918 &self,
919 table_name: &str,
920 requests: &'a [AddColumnRequest],
921 ) -> Result<SplitResult<'a>> {
922 let table_schema = &self.schema;
923 let mut columns_at_first = Vec::new();
924 let mut columns_at_after = HashMap::new();
925 let mut columns_at_last = Vec::new();
926 let mut column_names = Vec::with_capacity(requests.len());
927 for request in requests {
928 let column_name = &request.column_schema.name;
930 column_names.push(column_name.clone());
931 ensure!(
932 table_schema.column_schema_by_name(column_name).is_none(),
933 error::ColumnExistsSnafu {
934 column_name,
935 table_name,
936 }
937 );
938 match request.location.as_ref() {
939 Some(AddColumnLocation::First) => {
940 columns_at_first.push(request);
941 }
942 Some(AddColumnLocation::After { column_name }) => {
943 ensure!(
944 table_schema.column_schema_by_name(column_name).is_some(),
945 error::ColumnNotExistsSnafu {
946 column_name,
947 table_name,
948 }
949 );
950 columns_at_after
951 .entry(column_name.clone())
952 .or_insert(Vec::new())
953 .push(request);
954 }
955 None => {
956 columns_at_last.push(request);
957 }
958 }
959 }
960 Ok(SplitResult {
961 columns_at_first,
962 columns_at_after,
963 columns_at_last,
964 column_names,
965 })
966 }
967
968 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
969 let table_schema = &self.schema;
970 let mut meta_builder = self.new_meta_builder();
971 let mut columns = Vec::with_capacity(table_schema.num_columns());
972 for column_schema in table_schema.column_schemas() {
973 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
974 ensure!(
976 column_schema.default_constraint().is_some(),
977 error::InvalidAlterRequestSnafu {
978 table: table_name,
979 err: format!("column {name} does not have a default value"),
980 }
981 );
982 if !column_schema.is_nullable() {
983 return error::InvalidAlterRequestSnafu {
984 table: table_name,
985 err: format!(
986 "column {name} is not nullable and `default` cannot be dropped",
987 ),
988 }
989 .fail();
990 }
991 let new_column_schema = column_schema.clone();
992 let new_column_schema = new_column_schema
993 .with_default_constraint(None)
994 .with_context(|_| error::SchemaBuildSnafu {
995 msg: format!("Table {table_name} cannot drop default values"),
996 })?;
997 columns.push(new_column_schema);
998 } else {
999 columns.push(column_schema.clone());
1000 }
1001 }
1002
1003 let mut builder = SchemaBuilder::try_from_columns(columns)
1004 .with_context(|_| error::SchemaBuildSnafu {
1005 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1006 })?
1007 .version(table_schema.version() + 1);
1009 for (k, v) in table_schema.metadata().iter() {
1010 builder = builder.add_metadata(k, v);
1011 }
1012 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1013 msg: format!("Table {table_name} cannot drop default values"),
1014 })?;
1015
1016 let _ = meta_builder.schema(Arc::new(new_schema));
1017
1018 Ok(meta_builder)
1019 }
1020
1021 fn set_defaults(
1022 &self,
1023 table_name: &str,
1024 set_defaults: &[SetDefaultRequest],
1025 ) -> Result<TableMetaBuilder> {
1026 let table_schema = &self.schema;
1027 let mut meta_builder = self.new_meta_builder();
1028 let mut columns = Vec::with_capacity(table_schema.num_columns());
1029 for column_schema in table_schema.column_schemas() {
1030 if let Some(set_default) = set_defaults
1031 .iter()
1032 .find(|s| s.column_name == column_schema.name)
1033 {
1034 let new_column_schema = column_schema.clone();
1035 let new_column_schema = new_column_schema
1036 .with_default_constraint(set_default.default_constraint.clone())
1037 .with_context(|_| error::SchemaBuildSnafu {
1038 msg: format!("Table {table_name} cannot set default values"),
1039 })?;
1040 columns.push(new_column_schema);
1041 } else {
1042 columns.push(column_schema.clone());
1043 }
1044 }
1045
1046 let mut builder = SchemaBuilder::try_from_columns(columns)
1047 .with_context(|_| error::SchemaBuildSnafu {
1048 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1049 })?
1050 .version(table_schema.version() + 1);
1052 for (k, v) in table_schema.metadata().iter() {
1053 builder = builder.add_metadata(k, v);
1054 }
1055 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1056 msg: format!("Table {table_name} cannot set default values"),
1057 })?;
1058
1059 let _ = meta_builder.schema(Arc::new(new_schema));
1060
1061 Ok(meta_builder)
1062 }
1063}
1064
1065#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1066#[builder(pattern = "owned")]
1067pub struct TableInfo {
1068 #[builder(default, setter(into))]
1070 pub ident: TableIdent,
1071 #[builder(setter(into))]
1073 pub name: String,
1074 #[builder(default, setter(into))]
1076 pub desc: Option<String>,
1077 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1078 pub catalog_name: String,
1079 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1080 pub schema_name: String,
1081 pub meta: TableMeta,
1082 #[builder(default = "TableType::Base")]
1083 pub table_type: TableType,
1084}
1085
1086pub type TableInfoRef = Arc<TableInfo>;
1087
1088impl TableInfo {
1089 pub fn table_id(&self) -> TableId {
1090 self.ident.table_id
1091 }
1092
1093 pub fn full_table_name(&self) -> String {
1095 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1096 }
1097
1098 pub fn get_db_string(&self) -> String {
1099 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1100 }
1101
1102 pub fn is_physical_table(&self) -> bool {
1104 self.meta
1105 .options
1106 .extra_options
1107 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1108 }
1109
1110 pub fn is_ttl_instant_table(&self) -> bool {
1112 self.meta
1113 .options
1114 .ttl
1115 .map(|t| t.is_instant())
1116 .unwrap_or(false)
1117 }
1118}
1119
1120impl TableInfoBuilder {
1121 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1122 Self {
1123 name: Some(name.into()),
1124 meta: Some(meta),
1125 ..Default::default()
1126 }
1127 }
1128
1129 pub fn table_id(mut self, id: TableId) -> Self {
1130 let ident = self.ident.get_or_insert_with(TableIdent::default);
1131 ident.table_id = id;
1132 self
1133 }
1134
1135 pub fn table_version(mut self, version: TableVersion) -> Self {
1136 let ident = self.ident.get_or_insert_with(TableIdent::default);
1137 ident.version = version;
1138 self
1139 }
1140}
1141
1142impl TableIdent {
1143 pub fn new(table_id: TableId) -> Self {
1144 Self {
1145 table_id,
1146 version: 0,
1147 }
1148 }
1149}
1150
1151impl From<TableId> for TableIdent {
1152 fn from(table_id: TableId) -> Self {
1153 Self::new(table_id)
1154 }
1155}
1156
1157#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1159pub struct RawTableMeta {
1160 pub schema: RawSchema,
1161 pub primary_key_indices: Vec<usize>,
1164 pub value_indices: Vec<usize>,
1167 pub engine: String,
1169 pub next_column_id: ColumnId,
1172 pub options: TableOptions,
1173 pub created_on: DateTime<Utc>,
1174 pub updated_on: DateTime<Utc>,
1175 #[serde(default)]
1177 pub partition_key_indices: Vec<usize>,
1178 #[serde(default)]
1181 pub column_ids: Vec<ColumnId>,
1182}
1183
1184impl<'de> Deserialize<'de> for RawTableMeta {
1185 fn deserialize<D>(
1186 deserializer: D,
1187 ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1188 where
1189 D: Deserializer<'de>,
1190 {
1191 #[derive(Deserialize)]
1192 struct Helper {
1193 schema: RawSchema,
1194 primary_key_indices: Vec<usize>,
1195 value_indices: Vec<usize>,
1196 engine: String,
1197 next_column_id: u32,
1198 options: TableOptions,
1199 created_on: DateTime<Utc>,
1200 updated_on: Option<DateTime<Utc>>,
1201 #[serde(default)]
1202 partition_key_indices: Vec<usize>,
1203 #[serde(default)]
1204 column_ids: Vec<ColumnId>,
1205 }
1206
1207 let h = Helper::deserialize(deserializer)?;
1208 Ok(RawTableMeta {
1209 schema: h.schema,
1210 primary_key_indices: h.primary_key_indices,
1211 value_indices: h.value_indices,
1212 engine: h.engine,
1213 next_column_id: h.next_column_id,
1214 options: h.options,
1215 created_on: h.created_on,
1216 updated_on: h.updated_on.unwrap_or(h.created_on),
1217 partition_key_indices: h.partition_key_indices,
1218 column_ids: h.column_ids,
1219 })
1220 }
1221}
1222
1223impl From<TableMeta> for RawTableMeta {
1224 fn from(meta: TableMeta) -> RawTableMeta {
1225 RawTableMeta {
1226 schema: RawSchema::from(&*meta.schema),
1227 primary_key_indices: meta.primary_key_indices,
1228 value_indices: meta.value_indices,
1229 engine: meta.engine,
1230 next_column_id: meta.next_column_id,
1231 options: meta.options,
1232 created_on: meta.created_on,
1233 updated_on: meta.updated_on,
1234 partition_key_indices: meta.partition_key_indices,
1235 column_ids: meta.column_ids,
1236 }
1237 }
1238}
1239
1240impl TryFrom<RawTableMeta> for TableMeta {
1241 type Error = ConvertError;
1242
1243 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1244 Ok(TableMeta {
1245 schema: Arc::new(Schema::try_from(raw.schema)?),
1246 primary_key_indices: raw.primary_key_indices,
1247 value_indices: raw.value_indices,
1248 engine: raw.engine,
1249 next_column_id: raw.next_column_id,
1250 options: raw.options,
1251 created_on: raw.created_on,
1252 updated_on: raw.updated_on,
1253 partition_key_indices: raw.partition_key_indices,
1254 column_ids: raw.column_ids,
1255 })
1256 }
1257}
1258
1259#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1261pub struct RawTableInfo {
1262 pub ident: TableIdent,
1263 pub name: String,
1264 pub desc: Option<String>,
1265 pub catalog_name: String,
1266 pub schema_name: String,
1267 pub meta: RawTableMeta,
1268 pub table_type: TableType,
1269}
1270
1271impl RawTableInfo {
1272 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1276 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1277 None
1278 } else {
1279 Some(
1280 self.meta
1281 .column_ids
1282 .iter()
1283 .enumerate()
1284 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1285 .collect(),
1286 )
1287 }
1288 }
1289
1290 pub fn sort_columns(&mut self) {
1292 let column_schemas = &self.meta.schema.column_schemas;
1293 let primary_keys = self
1294 .meta
1295 .primary_key_indices
1296 .iter()
1297 .map(|index| column_schemas[*index].name.clone())
1298 .collect::<HashSet<_>>();
1299
1300 let name_to_ids = self.name_to_ids().unwrap_or_default();
1301 self.meta
1302 .schema
1303 .column_schemas
1304 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1305
1306 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1308 let mut timestamp_index = None;
1309 let mut value_indices =
1310 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1311 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1312 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1313 if primary_keys.contains(&column_schema.name) {
1314 primary_key_indices.push(index);
1315 } else if column_schema.is_time_index() {
1316 value_indices.push(index);
1317 timestamp_index = Some(index);
1318 } else {
1319 value_indices.push(index);
1320 }
1321 if let Some(id) = name_to_ids.get(&column_schema.name) {
1322 column_ids.push(*id);
1323 }
1324 }
1325
1326 self.meta.schema.timestamp_index = timestamp_index;
1328 self.meta.primary_key_indices = primary_key_indices;
1329 self.meta.value_indices = value_indices;
1330 self.meta.column_ids = column_ids;
1331 }
1332
1333 pub fn to_region_options(&self) -> HashMap<String, String> {
1337 HashMap::from(&self.meta.options)
1338 }
1339
1340 pub fn table_ref(&self) -> TableReference<'_> {
1342 TableReference::full(
1343 self.catalog_name.as_str(),
1344 self.schema_name.as_str(),
1345 self.name.as_str(),
1346 )
1347 }
1348}
1349
1350impl From<TableInfo> for RawTableInfo {
1351 fn from(info: TableInfo) -> RawTableInfo {
1352 RawTableInfo {
1353 ident: info.ident,
1354 name: info.name,
1355 desc: info.desc,
1356 catalog_name: info.catalog_name,
1357 schema_name: info.schema_name,
1358 meta: RawTableMeta::from(info.meta),
1359 table_type: info.table_type,
1360 }
1361 }
1362}
1363
1364impl TryFrom<RawTableInfo> for TableInfo {
1365 type Error = ConvertError;
1366
1367 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1368 Ok(TableInfo {
1369 ident: raw.ident,
1370 name: raw.name,
1371 desc: raw.desc,
1372 catalog_name: raw.catalog_name,
1373 schema_name: raw.schema_name,
1374 meta: TableMeta::try_from(raw.meta)?,
1375 table_type: raw.table_type,
1376 })
1377 }
1378}
1379
1380fn set_column_fulltext_options(
1389 column_schema: &mut ColumnSchema,
1390 column_name: &str,
1391 options: &FulltextOptions,
1392 current_options: Option<FulltextOptions>,
1393) -> Result<()> {
1394 if let Some(current_options) = current_options {
1395 ensure!(
1396 current_options.analyzer == options.analyzer
1397 && current_options.case_sensitive == options.case_sensitive,
1398 error::InvalidColumnOptionSnafu {
1399 column_name,
1400 msg: format!(
1401 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1402 current_options.analyzer, current_options.case_sensitive
1403 ),
1404 }
1405 );
1406 }
1407
1408 column_schema
1409 .set_fulltext_options(options)
1410 .context(error::SetFulltextOptionsSnafu { column_name })?;
1411
1412 Ok(())
1413}
1414
1415fn unset_column_fulltext_options(
1416 column_schema: &mut ColumnSchema,
1417 column_name: &str,
1418 current_options: Option<FulltextOptions>,
1419) -> Result<()> {
1420 ensure!(
1421 current_options
1422 .as_ref()
1423 .is_some_and(|options| options.enable),
1424 error::InvalidColumnOptionSnafu {
1425 column_name,
1426 msg: "FULLTEXT index already disabled".to_string(),
1427 }
1428 );
1429
1430 let mut options = current_options.unwrap();
1431 options.enable = false;
1432 column_schema
1433 .set_fulltext_options(&options)
1434 .context(error::SetFulltextOptionsSnafu { column_name })?;
1435
1436 Ok(())
1437}
1438
1439fn set_column_skipping_index_options(
1440 column_schema: &mut ColumnSchema,
1441 column_name: &str,
1442 options: &SkippingIndexOptions,
1443) -> Result<()> {
1444 column_schema
1445 .set_skipping_options(options)
1446 .context(error::SetSkippingOptionsSnafu { column_name })?;
1447
1448 Ok(())
1449}
1450
1451fn unset_column_skipping_index_options(
1452 column_schema: &mut ColumnSchema,
1453 column_name: &str,
1454) -> Result<()> {
1455 column_schema
1456 .unset_skipping_options()
1457 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1458 Ok(())
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463 use std::assert_matches::assert_matches;
1464
1465 use common_error::ext::ErrorExt;
1466 use common_error::status_code::StatusCode;
1467 use datatypes::data_type::ConcreteDataType;
1468 use datatypes::schema::{
1469 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1470 };
1471
1472 use super::*;
1473 use crate::Error;
1474
1475 fn new_test_schema() -> Schema {
1477 let column_schemas = vec![
1478 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1479 ColumnSchema::new(
1480 "ts",
1481 ConcreteDataType::timestamp_millisecond_datatype(),
1482 false,
1483 )
1484 .with_time_index(true),
1485 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1486 ];
1487 SchemaBuilder::try_from(column_schemas)
1488 .unwrap()
1489 .version(123)
1490 .build()
1491 .unwrap()
1492 }
1493
1494 #[test]
1495 fn test_raw_convert() {
1496 let schema = Arc::new(new_test_schema());
1497 let meta = TableMetaBuilder::empty()
1498 .schema(schema)
1499 .primary_key_indices(vec![0])
1500 .engine("engine")
1501 .next_column_id(3)
1502 .build()
1503 .unwrap();
1504 let info = TableInfoBuilder::default()
1505 .table_id(10)
1506 .table_version(5)
1507 .name("mytable")
1508 .meta(meta)
1509 .build()
1510 .unwrap();
1511
1512 let raw = RawTableInfo::from(info.clone());
1513 let info_new = TableInfo::try_from(raw).unwrap();
1514
1515 assert_eq!(info, info_new);
1516 }
1517
1518 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1519 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1520 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1521 let alter_kind = AlterKind::AddColumns {
1522 columns: vec![
1523 AddColumnRequest {
1524 column_schema: new_tag,
1525 is_key: true,
1526 location: None,
1527 add_if_not_exists: false,
1528 },
1529 AddColumnRequest {
1530 column_schema: new_field,
1531 is_key: false,
1532 location: None,
1533 add_if_not_exists: false,
1534 },
1535 ],
1536 };
1537
1538 let builder = meta
1539 .builder_with_alter_kind("my_table", &alter_kind)
1540 .unwrap();
1541 builder.build().unwrap()
1542 }
1543
1544 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1545 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1546 let new_field = ColumnSchema::new(
1547 "my_field_after_ts",
1548 ConcreteDataType::string_datatype(),
1549 true,
1550 );
1551 let yet_another_field = ColumnSchema::new(
1552 "yet_another_field_after_ts",
1553 ConcreteDataType::int64_datatype(),
1554 true,
1555 );
1556 let alter_kind = AlterKind::AddColumns {
1557 columns: vec![
1558 AddColumnRequest {
1559 column_schema: new_tag,
1560 is_key: true,
1561 location: Some(AddColumnLocation::First),
1562 add_if_not_exists: false,
1563 },
1564 AddColumnRequest {
1565 column_schema: new_field,
1566 is_key: false,
1567 location: Some(AddColumnLocation::After {
1568 column_name: "ts".to_string(),
1569 }),
1570 add_if_not_exists: false,
1571 },
1572 AddColumnRequest {
1573 column_schema: yet_another_field,
1574 is_key: true,
1575 location: Some(AddColumnLocation::After {
1576 column_name: "ts".to_string(),
1577 }),
1578 add_if_not_exists: false,
1579 },
1580 ],
1581 };
1582
1583 let builder = meta
1584 .builder_with_alter_kind("my_table", &alter_kind)
1585 .unwrap();
1586 builder.build().unwrap()
1587 }
1588
1589 #[test]
1590 fn test_add_columns() {
1591 let schema = Arc::new(new_test_schema());
1592 let meta = TableMetaBuilder::empty()
1593 .schema(schema)
1594 .primary_key_indices(vec![0])
1595 .engine("engine")
1596 .next_column_id(3)
1597 .build()
1598 .unwrap();
1599
1600 let new_meta = add_columns_to_meta(&meta);
1601 let names: Vec<String> = new_meta
1602 .schema
1603 .column_schemas()
1604 .iter()
1605 .map(|column_schema| column_schema.name.clone())
1606 .collect();
1607 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1608 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1609 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1610 }
1611
1612 #[test]
1613 fn test_add_columns_multiple_times() {
1614 let schema = Arc::new(new_test_schema());
1615 let meta = TableMetaBuilder::empty()
1616 .schema(schema)
1617 .primary_key_indices(vec![0])
1618 .engine("engine")
1619 .next_column_id(3)
1620 .build()
1621 .unwrap();
1622
1623 let alter_kind = AlterKind::AddColumns {
1624 columns: vec![
1625 AddColumnRequest {
1626 column_schema: ColumnSchema::new(
1627 "col3",
1628 ConcreteDataType::int32_datatype(),
1629 true,
1630 ),
1631 is_key: true,
1632 location: None,
1633 add_if_not_exists: true,
1634 },
1635 AddColumnRequest {
1636 column_schema: ColumnSchema::new(
1637 "col3",
1638 ConcreteDataType::int32_datatype(),
1639 true,
1640 ),
1641 is_key: true,
1642 location: None,
1643 add_if_not_exists: true,
1644 },
1645 ],
1646 };
1647 let err = meta
1648 .builder_with_alter_kind("my_table", &alter_kind)
1649 .err()
1650 .unwrap();
1651 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1652 }
1653
1654 #[test]
1655 fn test_remove_columns() {
1656 let schema = Arc::new(new_test_schema());
1657 let meta = TableMetaBuilder::empty()
1658 .schema(schema.clone())
1659 .primary_key_indices(vec![0])
1660 .engine("engine")
1661 .next_column_id(3)
1662 .build()
1663 .unwrap();
1664 let meta = add_columns_to_meta(&meta);
1666
1667 let alter_kind = AlterKind::DropColumns {
1668 names: vec![String::from("col2"), String::from("my_field")],
1669 };
1670 let new_meta = meta
1671 .builder_with_alter_kind("my_table", &alter_kind)
1672 .unwrap()
1673 .build()
1674 .unwrap();
1675
1676 let names: Vec<String> = new_meta
1677 .schema
1678 .column_schemas()
1679 .iter()
1680 .map(|column_schema| column_schema.name.clone())
1681 .collect();
1682 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1683 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1684 assert_eq!(&[1], &new_meta.value_indices[..]);
1685 assert_eq!(
1686 schema.timestamp_column(),
1687 new_meta.schema.timestamp_column()
1688 );
1689 }
1690
1691 #[test]
1692 fn test_remove_multiple_columns_before_timestamp() {
1693 let column_schemas = vec![
1694 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1695 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1696 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1697 ColumnSchema::new(
1698 "ts",
1699 ConcreteDataType::timestamp_millisecond_datatype(),
1700 false,
1701 )
1702 .with_time_index(true),
1703 ];
1704 let schema = Arc::new(
1705 SchemaBuilder::try_from(column_schemas)
1706 .unwrap()
1707 .version(123)
1708 .build()
1709 .unwrap(),
1710 );
1711 let meta = TableMetaBuilder::empty()
1712 .schema(schema.clone())
1713 .primary_key_indices(vec![1])
1714 .engine("engine")
1715 .next_column_id(4)
1716 .build()
1717 .unwrap();
1718
1719 let alter_kind = AlterKind::DropColumns {
1721 names: vec![String::from("col3"), String::from("col1")],
1722 };
1723 let new_meta = meta
1724 .builder_with_alter_kind("my_table", &alter_kind)
1725 .unwrap()
1726 .build()
1727 .unwrap();
1728
1729 let names: Vec<String> = new_meta
1730 .schema
1731 .column_schemas()
1732 .iter()
1733 .map(|column_schema| column_schema.name.clone())
1734 .collect();
1735 assert_eq!(&["col2", "ts"], &names[..]);
1736 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1737 assert_eq!(&[1], &new_meta.value_indices[..]);
1738 assert_eq!(
1739 schema.timestamp_column(),
1740 new_meta.schema.timestamp_column()
1741 );
1742 }
1743
1744 #[test]
1745 fn test_add_existing_column() {
1746 let schema = Arc::new(new_test_schema());
1747 let meta = TableMetaBuilder::empty()
1748 .schema(schema)
1749 .primary_key_indices(vec![0])
1750 .engine("engine")
1751 .next_column_id(3)
1752 .build()
1753 .unwrap();
1754
1755 let alter_kind = AlterKind::AddColumns {
1756 columns: vec![AddColumnRequest {
1757 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1758 is_key: false,
1759 location: None,
1760 add_if_not_exists: false,
1761 }],
1762 };
1763
1764 let err = meta
1765 .builder_with_alter_kind("my_table", &alter_kind)
1766 .err()
1767 .unwrap();
1768 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1769
1770 let alter_kind = AlterKind::AddColumns {
1772 columns: vec![AddColumnRequest {
1773 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1774 is_key: true,
1775 location: None,
1776 add_if_not_exists: true,
1777 }],
1778 };
1779 let new_meta = meta
1780 .builder_with_alter_kind("my_table", &alter_kind)
1781 .unwrap()
1782 .build()
1783 .unwrap();
1784 assert_eq!(
1785 meta.schema.column_schemas(),
1786 new_meta.schema.column_schemas()
1787 );
1788 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1789 }
1790
1791 #[test]
1792 fn test_add_different_type_column() {
1793 let schema = Arc::new(new_test_schema());
1794 let meta = TableMetaBuilder::empty()
1795 .schema(schema)
1796 .primary_key_indices(vec![0])
1797 .engine("engine")
1798 .next_column_id(3)
1799 .build()
1800 .unwrap();
1801
1802 let alter_kind = AlterKind::AddColumns {
1804 columns: vec![AddColumnRequest {
1805 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1806 is_key: false,
1807 location: None,
1808 add_if_not_exists: true,
1809 }],
1810 };
1811 let err = meta
1812 .builder_with_alter_kind("my_table", &alter_kind)
1813 .err()
1814 .unwrap();
1815 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1816 }
1817
1818 #[test]
1819 fn test_add_invalid_column() {
1820 let schema = Arc::new(new_test_schema());
1821 let meta = TableMetaBuilder::empty()
1822 .schema(schema)
1823 .primary_key_indices(vec![0])
1824 .engine("engine")
1825 .next_column_id(3)
1826 .build()
1827 .unwrap();
1828
1829 let alter_kind = AlterKind::AddColumns {
1831 columns: vec![AddColumnRequest {
1832 column_schema: ColumnSchema::new(
1833 "weny",
1834 ConcreteDataType::string_datatype(),
1835 false,
1836 ),
1837 is_key: false,
1838 location: None,
1839 add_if_not_exists: false,
1840 }],
1841 };
1842
1843 let err = meta
1844 .builder_with_alter_kind("my_table", &alter_kind)
1845 .err()
1846 .unwrap();
1847 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1848 }
1849
1850 #[test]
1851 fn test_remove_unknown_column() {
1852 let schema = Arc::new(new_test_schema());
1853 let meta = TableMetaBuilder::empty()
1854 .schema(schema)
1855 .primary_key_indices(vec![0])
1856 .engine("engine")
1857 .next_column_id(3)
1858 .build()
1859 .unwrap();
1860
1861 let alter_kind = AlterKind::DropColumns {
1862 names: vec![String::from("unknown")],
1863 };
1864
1865 let err = meta
1866 .builder_with_alter_kind("my_table", &alter_kind)
1867 .err()
1868 .unwrap();
1869 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1870 }
1871
1872 #[test]
1873 fn test_change_unknown_column_data_type() {
1874 let schema = Arc::new(new_test_schema());
1875 let meta = TableMetaBuilder::empty()
1876 .schema(schema)
1877 .primary_key_indices(vec![0])
1878 .engine("engine")
1879 .next_column_id(3)
1880 .build()
1881 .unwrap();
1882
1883 let alter_kind = AlterKind::ModifyColumnTypes {
1884 columns: vec![ModifyColumnTypeRequest {
1885 column_name: "unknown".to_string(),
1886 target_type: ConcreteDataType::string_datatype(),
1887 }],
1888 };
1889
1890 let err = meta
1891 .builder_with_alter_kind("my_table", &alter_kind)
1892 .err()
1893 .unwrap();
1894 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1895 }
1896
1897 #[test]
1898 fn test_remove_key_column() {
1899 let schema = Arc::new(new_test_schema());
1900 let meta = TableMetaBuilder::empty()
1901 .schema(schema)
1902 .primary_key_indices(vec![0])
1903 .engine("engine")
1904 .next_column_id(3)
1905 .build()
1906 .unwrap();
1907
1908 let alter_kind = AlterKind::DropColumns {
1910 names: vec![String::from("col1")],
1911 };
1912
1913 let err = meta
1914 .builder_with_alter_kind("my_table", &alter_kind)
1915 .err()
1916 .unwrap();
1917 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1918
1919 let alter_kind = AlterKind::DropColumns {
1921 names: vec![String::from("ts")],
1922 };
1923
1924 let err = meta
1925 .builder_with_alter_kind("my_table", &alter_kind)
1926 .err()
1927 .unwrap();
1928 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1929 }
1930
1931 #[test]
1932 fn test_remove_partition_column() {
1933 let schema = Arc::new(new_test_schema());
1934 let meta = TableMetaBuilder::empty()
1935 .schema(schema)
1936 .primary_key_indices(vec![])
1937 .partition_key_indices(vec![0])
1938 .engine("engine")
1939 .next_column_id(3)
1940 .build()
1941 .unwrap();
1942 let alter_kind = AlterKind::DropColumns {
1944 names: vec![String::from("col1")],
1945 };
1946
1947 let err = meta
1948 .builder_with_alter_kind("my_table", &alter_kind)
1949 .err()
1950 .unwrap();
1951 assert_matches!(err, Error::RemovePartitionColumn { .. });
1952 }
1953
1954 #[test]
1955 fn test_change_key_column_data_type() {
1956 let schema = Arc::new(new_test_schema());
1957 let meta = TableMetaBuilder::empty()
1958 .schema(schema)
1959 .primary_key_indices(vec![0])
1960 .engine("engine")
1961 .next_column_id(3)
1962 .build()
1963 .unwrap();
1964
1965 let alter_kind = AlterKind::ModifyColumnTypes {
1967 columns: vec![ModifyColumnTypeRequest {
1968 column_name: "col1".to_string(),
1969 target_type: ConcreteDataType::string_datatype(),
1970 }],
1971 };
1972
1973 let err = meta
1974 .builder_with_alter_kind("my_table", &alter_kind)
1975 .err()
1976 .unwrap();
1977 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1978
1979 let alter_kind = AlterKind::ModifyColumnTypes {
1981 columns: vec![ModifyColumnTypeRequest {
1982 column_name: "ts".to_string(),
1983 target_type: ConcreteDataType::string_datatype(),
1984 }],
1985 };
1986
1987 let err = meta
1988 .builder_with_alter_kind("my_table", &alter_kind)
1989 .err()
1990 .unwrap();
1991 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1992 }
1993
1994 #[test]
1995 fn test_alloc_new_column() {
1996 let schema = Arc::new(new_test_schema());
1997 let mut meta = TableMetaBuilder::empty()
1998 .schema(schema)
1999 .primary_key_indices(vec![0])
2000 .engine("engine")
2001 .next_column_id(3)
2002 .build()
2003 .unwrap();
2004 assert_eq!(3, meta.next_column_id);
2005
2006 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2007 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2008
2009 assert_eq!(4, meta.next_column_id);
2010 assert_eq!(column_schema.name, desc.name);
2011 }
2012
2013 #[test]
2014 fn test_add_columns_with_location() {
2015 let schema = Arc::new(new_test_schema());
2016 let meta = TableMetaBuilder::empty()
2017 .schema(schema)
2018 .primary_key_indices(vec![0])
2019 .partition_key_indices(vec![0, 2])
2021 .engine("engine")
2022 .next_column_id(3)
2023 .build()
2024 .unwrap();
2025
2026 let new_meta = add_columns_to_meta_with_location(&meta);
2027 let names: Vec<String> = new_meta
2028 .schema
2029 .column_schemas()
2030 .iter()
2031 .map(|column_schema| column_schema.name.clone())
2032 .collect();
2033 assert_eq!(
2034 &[
2035 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
2042 &names[..]
2043 );
2044 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2045 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2046 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2047 }
2048
2049 #[test]
2050 fn test_modify_column_fulltext_options() {
2051 let schema = Arc::new(new_test_schema());
2052 let meta = TableMetaBuilder::empty()
2053 .schema(schema)
2054 .primary_key_indices(vec![0])
2055 .engine("engine")
2056 .next_column_id(3)
2057 .build()
2058 .unwrap();
2059
2060 let alter_kind = AlterKind::SetIndexes {
2061 options: vec![SetIndexOption::Fulltext {
2062 column_name: "col1".to_string(),
2063 options: FulltextOptions::default(),
2064 }],
2065 };
2066 let err = meta
2067 .builder_with_alter_kind("my_table", &alter_kind)
2068 .err()
2069 .unwrap();
2070 assert_eq!(
2071 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2072 err.to_string()
2073 );
2074
2075 let new_meta = add_columns_to_meta_with_location(&meta);
2077 let alter_kind = AlterKind::SetIndexes {
2078 options: vec![SetIndexOption::Fulltext {
2079 column_name: "my_tag_first".to_string(),
2080 options: FulltextOptions::new_unchecked(
2081 true,
2082 FulltextAnalyzer::Chinese,
2083 true,
2084 FulltextBackend::Bloom,
2085 1000,
2086 0.01,
2087 ),
2088 }],
2089 };
2090 let new_meta = new_meta
2091 .builder_with_alter_kind("my_table", &alter_kind)
2092 .unwrap()
2093 .build()
2094 .unwrap();
2095 let column_schema = new_meta
2096 .schema
2097 .column_schema_by_name("my_tag_first")
2098 .unwrap();
2099 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2100 assert!(fulltext_options.enable);
2101 assert_eq!(
2102 datatypes::schema::FulltextAnalyzer::Chinese,
2103 fulltext_options.analyzer
2104 );
2105 assert!(fulltext_options.case_sensitive);
2106
2107 let alter_kind = AlterKind::UnsetIndexes {
2108 options: vec![UnsetIndexOption::Fulltext {
2109 column_name: "my_tag_first".to_string(),
2110 }],
2111 };
2112 let new_meta = new_meta
2113 .builder_with_alter_kind("my_table", &alter_kind)
2114 .unwrap()
2115 .build()
2116 .unwrap();
2117 let column_schema = new_meta
2118 .schema
2119 .column_schema_by_name("my_tag_first")
2120 .unwrap();
2121 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2122 assert!(!fulltext_options.enable);
2123 }
2124}