1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105 fn from(t: TableType) -> datafusion::datasource::TableType {
106 match t {
107 TableType::Base => datafusion::datasource::TableType::Base,
108 TableType::View => datafusion::datasource::TableType::View,
109 TableType::Temporary => datafusion::datasource::TableType::Temporary,
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117 pub table_id: TableId,
119 pub version: TableVersion,
122}
123
124#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130 pub schema: SchemaRef,
131 pub primary_key_indices: Vec<usize>,
134 #[builder(default = "self.default_value_indices()?")]
135 pub value_indices: Vec<usize>,
136 #[builder(default, setter(into))]
137 pub engine: String,
138 #[builder(default, setter(into))]
139 pub region_numbers: Vec<u32>,
140 pub next_column_id: ColumnId,
141 #[builder(default)]
143 pub options: TableOptions,
144 #[builder(default = "Utc::now()")]
145 pub created_on: DateTime<Utc>,
146 #[builder(default = "self.default_updated_on()")]
147 pub updated_on: DateTime<Utc>,
148 #[builder(default = "Vec::new()")]
149 pub partition_key_indices: Vec<usize>,
150 #[builder(default = "Vec::new()")]
151 pub column_ids: Vec<ColumnId>,
152}
153
154impl TableMetaBuilder {
155 #[cfg(any(test, feature = "testing"))]
157 pub fn empty() -> Self {
158 Self {
159 schema: None,
160 primary_key_indices: None,
161 value_indices: None,
162 engine: None,
163 region_numbers: None,
164 next_column_id: None,
165 options: None,
166 created_on: None,
167 updated_on: None,
168 partition_key_indices: None,
169 column_ids: None,
170 }
171 }
172}
173
174impl TableMetaBuilder {
175 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
176 match (&self.primary_key_indices, &self.schema) {
177 (Some(v), Some(schema)) => {
178 let column_schemas = schema.column_schemas();
179 Ok((0..column_schemas.len())
180 .filter(|idx| !v.contains(idx))
181 .collect())
182 }
183 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
184 }
185 }
186
187 fn default_updated_on(&self) -> DateTime<Utc> {
188 self.created_on.unwrap_or_default()
189 }
190
191 pub fn new_external_table() -> Self {
192 Self {
193 schema: None,
194 primary_key_indices: Some(Vec::new()),
195 value_indices: Some(Vec::new()),
196 engine: None,
197 region_numbers: Some(Vec::new()),
198 next_column_id: Some(0),
199 options: None,
200 created_on: None,
201 updated_on: None,
202 partition_key_indices: None,
203 column_ids: None,
204 }
205 }
206}
207
208struct SplitResult<'a> {
210 columns_at_first: Vec<&'a AddColumnRequest>,
212 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
214 columns_at_last: Vec<&'a AddColumnRequest>,
216 column_names: Vec<String>,
218}
219
220impl TableMeta {
221 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
222 let columns_schemas = &self.schema.column_schemas();
223 self.primary_key_indices
224 .iter()
225 .map(|idx| &columns_schemas[*idx].name)
226 }
227
228 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
229 let columns_schemas = self.schema.column_schemas();
231 let primary_key_indices = &self.primary_key_indices;
232 columns_schemas
233 .iter()
234 .enumerate()
235 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
236 .map(|(_, cs)| &cs.name)
237 }
238
239 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
240 let columns_schemas = &self.schema.column_schemas();
241 self.partition_key_indices
242 .iter()
243 .map(|idx| &columns_schemas[*idx].name)
244 }
245
246 pub fn builder_with_alter_kind(
250 &self,
251 table_name: &str,
252 alter_kind: &AlterKind,
253 ) -> Result<TableMetaBuilder> {
254 let mut builder = match alter_kind {
255 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
256 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
257 AlterKind::ModifyColumnTypes { columns } => {
258 self.modify_column_types(table_name, columns)
259 }
260 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
262 AlterKind::SetTableOptions { options } => self.set_table_options(options),
263 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
264 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
265 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
266 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
267 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
268 }?;
269 let _ = builder.updated_on(Utc::now());
270 Ok(builder)
271 }
272
273 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
275 let mut new_options = self.options.clone();
276
277 for request in requests {
278 match request {
279 SetRegionOption::Ttl(new_ttl) => {
280 new_options.ttl = *new_ttl;
281 }
282 SetRegionOption::Twsc(key, value) => {
283 if !value.is_empty() {
284 new_options.extra_options.insert(key.clone(), value.clone());
285 new_options.extra_options.insert(
287 COMPACTION_TYPE.to_string(),
288 COMPACTION_TYPE_TWCS.to_string(),
289 );
290 } else {
291 new_options.extra_options.remove(key.as_str());
293 }
294 }
295 SetRegionOption::Format(value) => {
296 new_options
297 .extra_options
298 .insert(SST_FORMAT_KEY.to_string(), value.clone());
299 }
300 }
301 }
302 let mut builder = self.new_meta_builder();
303 builder.options(new_options);
304
305 Ok(builder)
306 }
307
308 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
309 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
310 self.set_table_options(&requests)
311 }
312
313 fn set_indexes(
314 &self,
315 table_name: &str,
316 requests: &[SetIndexOption],
317 ) -> Result<TableMetaBuilder> {
318 let table_schema = &self.schema;
319 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
320 for request in requests {
321 let column_name = request.column_name();
322 table_schema
323 .column_index_by_name(column_name)
324 .with_context(|| error::ColumnNotExistsSnafu {
325 column_name,
326 table_name,
327 })?;
328 set_index_options
329 .entry(column_name)
330 .or_default()
331 .push(request);
332 }
333
334 let mut meta_builder = self.new_meta_builder();
335 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
336 for mut column in table_schema.column_schemas().iter().cloned() {
337 if let Some(request) = set_index_options.get(column.name.as_str()) {
338 for request in request {
339 self.set_index(&mut column, request)?;
340 }
341 }
342 columns.push(column);
343 }
344
345 let mut builder = SchemaBuilder::try_from_columns(columns)
346 .with_context(|_| error::SchemaBuildSnafu {
347 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
348 })?
349 .version(table_schema.version() + 1);
350
351 for (k, v) in table_schema.metadata().iter() {
352 builder = builder.add_metadata(k, v);
353 }
354
355 let new_schema = builder.build().with_context(|_| {
356 let column_names = requests
357 .iter()
358 .map(|request| request.column_name())
359 .collect::<Vec<_>>();
360 error::SchemaBuildSnafu {
361 msg: format!(
362 "Table {table_name} cannot set index options with columns {column_names:?}",
363 ),
364 }
365 })?;
366 let _ = meta_builder
367 .schema(Arc::new(new_schema))
368 .primary_key_indices(self.primary_key_indices.clone());
369
370 Ok(meta_builder)
371 }
372
373 fn unset_indexes(
374 &self,
375 table_name: &str,
376 requests: &[UnsetIndexOption],
377 ) -> Result<TableMetaBuilder> {
378 let table_schema = &self.schema;
379 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
380 for request in requests {
381 let column_name = request.column_name();
382 table_schema
383 .column_index_by_name(column_name)
384 .with_context(|| error::ColumnNotExistsSnafu {
385 column_name,
386 table_name,
387 })?;
388 set_index_options
389 .entry(column_name)
390 .or_default()
391 .push(request);
392 }
393
394 let mut meta_builder = self.new_meta_builder();
395 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
396 for mut column in table_schema.column_schemas().iter().cloned() {
397 if let Some(request) = set_index_options.get(column.name.as_str()) {
398 for request in request {
399 self.unset_index(&mut column, request)?;
400 }
401 }
402 columns.push(column);
403 }
404
405 let mut builder = SchemaBuilder::try_from_columns(columns)
406 .with_context(|_| error::SchemaBuildSnafu {
407 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
408 })?
409 .version(table_schema.version() + 1);
410
411 for (k, v) in table_schema.metadata().iter() {
412 builder = builder.add_metadata(k, v);
413 }
414
415 let new_schema = builder.build().with_context(|_| {
416 let column_names = requests
417 .iter()
418 .map(|request| request.column_name())
419 .collect::<Vec<_>>();
420 error::SchemaBuildSnafu {
421 msg: format!(
422 "Table {table_name} cannot set index options with columns {column_names:?}",
423 ),
424 }
425 })?;
426 let _ = meta_builder
427 .schema(Arc::new(new_schema))
428 .primary_key_indices(self.primary_key_indices.clone());
429
430 Ok(meta_builder)
431 }
432
433 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
434 match request {
435 SetIndexOption::Fulltext {
436 column_name,
437 options,
438 } => {
439 ensure!(
440 column_schema.data_type.is_string(),
441 error::InvalidColumnOptionSnafu {
442 column_name,
443 msg: "FULLTEXT index only supports string type",
444 }
445 );
446
447 let current_fulltext_options = column_schema
448 .fulltext_options()
449 .context(error::SetFulltextOptionsSnafu { column_name })?;
450 set_column_fulltext_options(
451 column_schema,
452 column_name,
453 options,
454 current_fulltext_options,
455 )?;
456 }
457 SetIndexOption::Inverted { column_name } => {
458 debug_assert_eq!(column_schema.name, *column_name);
459 column_schema.set_inverted_index(true);
460 }
461 SetIndexOption::Skipping {
462 column_name,
463 options,
464 } => {
465 set_column_skipping_index_options(column_schema, column_name, options)?;
466 }
467 }
468
469 Ok(())
470 }
471
472 fn unset_index(
473 &self,
474 column_schema: &mut ColumnSchema,
475 request: &UnsetIndexOption,
476 ) -> Result<()> {
477 match request {
478 UnsetIndexOption::Fulltext { column_name } => {
479 let current_fulltext_options = column_schema
480 .fulltext_options()
481 .context(error::SetFulltextOptionsSnafu { column_name })?;
482 unset_column_fulltext_options(
483 column_schema,
484 column_name,
485 current_fulltext_options.clone(),
486 )?
487 }
488 UnsetIndexOption::Inverted { .. } => {
489 column_schema.set_inverted_index(false);
490 }
491 UnsetIndexOption::Skipping { column_name } => {
492 unset_column_skipping_index_options(column_schema, column_name)?;
493 }
494 }
495
496 Ok(())
497 }
498
499 pub fn alloc_new_column(
504 &mut self,
505 table_name: &str,
506 new_column: &ColumnSchema,
507 ) -> Result<ColumnDescriptor> {
508 let desc = ColumnDescriptorBuilder::new(
509 self.next_column_id as ColumnId,
510 &new_column.name,
511 new_column.data_type.clone(),
512 )
513 .is_nullable(new_column.is_nullable())
514 .default_constraint(new_column.default_constraint().cloned())
515 .build()
516 .context(error::BuildColumnDescriptorSnafu {
517 table_name,
518 column_name: &new_column.name,
519 })?;
520
521 self.next_column_id += 1;
523
524 Ok(desc)
525 }
526
527 fn new_meta_builder(&self) -> TableMetaBuilder {
529 let mut builder = TableMetaBuilder::from(self);
530 builder.value_indices = None;
532 builder
533 }
534
535 fn add_columns(
537 &self,
538 table_name: &str,
539 requests: &[AddColumnRequest],
540 ) -> Result<TableMetaBuilder> {
541 let table_schema = &self.schema;
542 let mut meta_builder = self.new_meta_builder();
543 let original_primary_key_indices: HashSet<&usize> =
544 self.primary_key_indices.iter().collect();
545
546 let mut names = HashSet::with_capacity(requests.len());
547 let mut new_columns = Vec::with_capacity(requests.len());
548 for col_to_add in requests {
549 if let Some(column_schema) =
550 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
551 {
552 ensure!(
554 col_to_add.add_if_not_exists,
555 error::ColumnExistsSnafu {
556 table_name,
557 column_name: &col_to_add.column_schema.name
558 },
559 );
560
561 ensure!(
563 column_schema.data_type == col_to_add.column_schema.data_type,
564 error::InvalidAlterRequestSnafu {
565 table: table_name,
566 err: format!(
567 "column {} already exists with different type {:?}",
568 col_to_add.column_schema.name, column_schema.data_type,
569 ),
570 }
571 );
572 } else {
573 ensure!(
576 names.insert(&col_to_add.column_schema.name),
577 error::InvalidAlterRequestSnafu {
578 table: table_name,
579 err: format!(
580 "add column {} more than once",
581 col_to_add.column_schema.name
582 ),
583 }
584 );
585
586 ensure!(
587 col_to_add.column_schema.is_nullable()
588 || col_to_add.column_schema.default_constraint().is_some(),
589 error::InvalidAlterRequestSnafu {
590 table: table_name,
591 err: format!(
592 "no default value for column {}",
593 col_to_add.column_schema.name
594 ),
595 },
596 );
597
598 new_columns.push(col_to_add.clone());
599 }
600 }
601 let requests = &new_columns[..];
602
603 let SplitResult {
604 columns_at_first,
605 columns_at_after,
606 columns_at_last,
607 column_names,
608 } = self.split_requests_by_column_location(table_name, requests)?;
609 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
610 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
611 columns_at_first.iter().rev().for_each(|request| {
613 if request.is_key {
614 primary_key_indices.push(columns.len());
616 }
617 columns.push(request.column_schema.clone());
618 });
619 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
621 if original_primary_key_indices.contains(&index) {
622 primary_key_indices.push(columns.len());
623 }
624 columns.push(column_schema.clone());
625 if let Some(requests) = columns_at_after.get(&column_schema.name) {
626 requests.iter().rev().for_each(|request| {
627 if request.is_key {
628 primary_key_indices.push(columns.len());
630 }
631 columns.push(request.column_schema.clone());
632 });
633 }
634 }
635 columns_at_last.iter().for_each(|request| {
637 if request.is_key {
638 primary_key_indices.push(columns.len());
640 }
641 columns.push(request.column_schema.clone());
642 });
643
644 let mut builder = SchemaBuilder::try_from(columns)
645 .with_context(|_| error::SchemaBuildSnafu {
646 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
647 })?
648 .version(table_schema.version() + 1);
650 for (k, v) in table_schema.metadata().iter() {
651 builder = builder.add_metadata(k, v);
652 }
653 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
654 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
655 })?;
656
657 let partition_key_indices = self
658 .partition_key_indices
659 .iter()
660 .map(|idx| table_schema.column_name_by_index(*idx))
661 .map(|name| new_schema.column_index_by_name(name).unwrap())
663 .collect();
664
665 let _ = meta_builder
667 .schema(Arc::new(new_schema))
668 .primary_key_indices(primary_key_indices)
669 .partition_key_indices(partition_key_indices);
670
671 Ok(meta_builder)
672 }
673
674 fn remove_columns(
675 &self,
676 table_name: &str,
677 column_names: &[String],
678 ) -> Result<TableMetaBuilder> {
679 let table_schema = &self.schema;
680 let column_names: HashSet<_> = column_names.iter().collect();
681 let mut meta_builder = self.new_meta_builder();
682
683 let timestamp_index = table_schema.timestamp_index();
684 for column_name in &column_names {
686 if let Some(index) = table_schema.column_index_by_name(column_name) {
687 ensure!(
690 !self.primary_key_indices.contains(&index),
691 error::RemoveColumnInIndexSnafu {
692 column_name: *column_name,
693 table_name,
694 }
695 );
696
697 ensure!(
698 !self.partition_key_indices.contains(&index),
699 error::RemovePartitionColumnSnafu {
700 column_name: *column_name,
701 table_name,
702 }
703 );
704
705 if let Some(ts_index) = timestamp_index {
706 ensure!(
708 index != ts_index,
709 error::RemoveColumnInIndexSnafu {
710 column_name: table_schema.column_name_by_index(ts_index),
711 table_name,
712 }
713 );
714 }
715 } else {
716 return error::ColumnNotExistsSnafu {
717 column_name: *column_name,
718 table_name,
719 }
720 .fail()?;
721 }
722 }
723
724 let columns: Vec<_> = table_schema
726 .column_schemas()
727 .iter()
728 .filter(|column_schema| !column_names.contains(&column_schema.name))
729 .cloned()
730 .collect();
731
732 let mut builder = SchemaBuilder::try_from_columns(columns)
733 .with_context(|_| error::SchemaBuildSnafu {
734 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
735 })?
736 .version(table_schema.version() + 1);
738 for (k, v) in table_schema.metadata().iter() {
739 builder = builder.add_metadata(k, v);
740 }
741 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
742 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
743 })?;
744
745 let primary_key_indices = self
747 .primary_key_indices
748 .iter()
749 .map(|idx| table_schema.column_name_by_index(*idx))
750 .map(|name| new_schema.column_index_by_name(name).unwrap())
752 .collect();
753
754 let partition_key_indices = self
755 .partition_key_indices
756 .iter()
757 .map(|idx| table_schema.column_name_by_index(*idx))
758 .map(|name| new_schema.column_index_by_name(name).unwrap())
760 .collect();
761
762 let _ = meta_builder
763 .schema(Arc::new(new_schema))
764 .primary_key_indices(primary_key_indices)
765 .partition_key_indices(partition_key_indices);
766
767 Ok(meta_builder)
768 }
769
770 fn modify_column_types(
771 &self,
772 table_name: &str,
773 requests: &[ModifyColumnTypeRequest],
774 ) -> Result<TableMetaBuilder> {
775 let table_schema = &self.schema;
776 let mut meta_builder = self.new_meta_builder();
777
778 let mut modify_column_types = HashMap::with_capacity(requests.len());
779 let timestamp_index = table_schema.timestamp_index();
780
781 for col_to_change in requests {
782 let change_column_name = &col_to_change.column_name;
783
784 let index = table_schema
785 .column_index_by_name(change_column_name)
786 .with_context(|| error::ColumnNotExistsSnafu {
787 column_name: change_column_name,
788 table_name,
789 })?;
790
791 let column = &table_schema.column_schemas()[index];
792
793 ensure!(
794 !self.primary_key_indices.contains(&index),
795 error::InvalidAlterRequestSnafu {
796 table: table_name,
797 err: format!(
798 "Not allowed to change primary key index column '{}'",
799 column.name
800 )
801 }
802 );
803
804 if let Some(ts_index) = timestamp_index {
805 ensure!(
807 index != ts_index,
808 error::InvalidAlterRequestSnafu {
809 table: table_name,
810 err: format!(
811 "Not allowed to change timestamp index column '{}' datatype",
812 column.name
813 )
814 }
815 );
816 }
817
818 ensure!(
819 modify_column_types
820 .insert(&col_to_change.column_name, col_to_change)
821 .is_none(),
822 error::InvalidAlterRequestSnafu {
823 table: table_name,
824 err: format!(
825 "change column datatype {} more than once",
826 col_to_change.column_name
827 ),
828 }
829 );
830
831 ensure!(
832 column
833 .data_type
834 .can_arrow_type_cast_to(&col_to_change.target_type),
835 error::InvalidAlterRequestSnafu {
836 table: table_name,
837 err: format!(
838 "column '{}' cannot be cast automatically to type '{}'",
839 col_to_change.column_name, col_to_change.target_type,
840 ),
841 }
842 );
843
844 ensure!(
845 column.is_nullable(),
846 error::InvalidAlterRequestSnafu {
847 table: table_name,
848 err: format!(
849 "column '{}' must be nullable to ensure safe conversion.",
850 col_to_change.column_name,
851 ),
852 }
853 );
854 }
855 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
858 for mut column in table_schema.column_schemas().iter().cloned() {
859 if let Some(change_column) = modify_column_types.get(&column.name) {
860 column.data_type = change_column.target_type.clone();
861 let new_default = if let Some(default_value) = column.default_constraint() {
862 Some(
863 default_value
864 .cast_to_datatype(&change_column.target_type)
865 .with_context(|_| error::CastDefaultValueSnafu {
866 reason: format!(
867 "Failed to cast default value from {:?} to type {:?}",
868 default_value, &change_column.target_type
869 ),
870 })?,
871 )
872 } else {
873 None
874 };
875 column = column
876 .clone()
877 .with_default_constraint(new_default.clone())
878 .with_context(|_| error::CastDefaultValueSnafu {
879 reason: format!("Failed to set new default: {:?}", new_default),
880 })?;
881 }
882 columns.push(column)
883 }
884
885 let mut builder = SchemaBuilder::try_from_columns(columns)
886 .with_context(|_| error::SchemaBuildSnafu {
887 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
888 })?
889 .version(table_schema.version() + 1);
891 for (k, v) in table_schema.metadata().iter() {
892 builder = builder.add_metadata(k, v);
893 }
894 let new_schema = builder.build().with_context(|_| {
895 let column_names: Vec<_> = requests
896 .iter()
897 .map(|request| &request.column_name)
898 .collect();
899
900 error::SchemaBuildSnafu {
901 msg: format!(
902 "Table {table_name} cannot change datatype with columns {column_names:?}"
903 ),
904 }
905 })?;
906
907 let _ = meta_builder
908 .schema(Arc::new(new_schema))
909 .primary_key_indices(self.primary_key_indices.clone());
910
911 Ok(meta_builder)
912 }
913
914 fn split_requests_by_column_location<'a>(
916 &self,
917 table_name: &str,
918 requests: &'a [AddColumnRequest],
919 ) -> Result<SplitResult<'a>> {
920 let table_schema = &self.schema;
921 let mut columns_at_first = Vec::new();
922 let mut columns_at_after = HashMap::new();
923 let mut columns_at_last = Vec::new();
924 let mut column_names = Vec::with_capacity(requests.len());
925 for request in requests {
926 let column_name = &request.column_schema.name;
928 column_names.push(column_name.clone());
929 ensure!(
930 table_schema.column_schema_by_name(column_name).is_none(),
931 error::ColumnExistsSnafu {
932 column_name,
933 table_name,
934 }
935 );
936 match request.location.as_ref() {
937 Some(AddColumnLocation::First) => {
938 columns_at_first.push(request);
939 }
940 Some(AddColumnLocation::After { column_name }) => {
941 ensure!(
942 table_schema.column_schema_by_name(column_name).is_some(),
943 error::ColumnNotExistsSnafu {
944 column_name,
945 table_name,
946 }
947 );
948 columns_at_after
949 .entry(column_name.clone())
950 .or_insert(Vec::new())
951 .push(request);
952 }
953 None => {
954 columns_at_last.push(request);
955 }
956 }
957 }
958 Ok(SplitResult {
959 columns_at_first,
960 columns_at_after,
961 columns_at_last,
962 column_names,
963 })
964 }
965
966 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
967 let table_schema = &self.schema;
968 let mut meta_builder = self.new_meta_builder();
969 let mut columns = Vec::with_capacity(table_schema.num_columns());
970 for column_schema in table_schema.column_schemas() {
971 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
972 ensure!(
974 column_schema.default_constraint().is_some(),
975 error::InvalidAlterRequestSnafu {
976 table: table_name,
977 err: format!("column {name} does not have a default value"),
978 }
979 );
980 if !column_schema.is_nullable() {
981 return error::InvalidAlterRequestSnafu {
982 table: table_name,
983 err: format!(
984 "column {name} is not nullable and `default` cannot be dropped",
985 ),
986 }
987 .fail();
988 }
989 let new_column_schema = column_schema.clone();
990 let new_column_schema = new_column_schema
991 .with_default_constraint(None)
992 .with_context(|_| error::SchemaBuildSnafu {
993 msg: format!("Table {table_name} cannot drop default values"),
994 })?;
995 columns.push(new_column_schema);
996 } else {
997 columns.push(column_schema.clone());
998 }
999 }
1000
1001 let mut builder = SchemaBuilder::try_from_columns(columns)
1002 .with_context(|_| error::SchemaBuildSnafu {
1003 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1004 })?
1005 .version(table_schema.version() + 1);
1007 for (k, v) in table_schema.metadata().iter() {
1008 builder = builder.add_metadata(k, v);
1009 }
1010 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1011 msg: format!("Table {table_name} cannot drop default values"),
1012 })?;
1013
1014 let _ = meta_builder.schema(Arc::new(new_schema));
1015
1016 Ok(meta_builder)
1017 }
1018
1019 fn set_defaults(
1020 &self,
1021 table_name: &str,
1022 set_defaults: &[SetDefaultRequest],
1023 ) -> Result<TableMetaBuilder> {
1024 let table_schema = &self.schema;
1025 let mut meta_builder = self.new_meta_builder();
1026 let mut columns = Vec::with_capacity(table_schema.num_columns());
1027 for column_schema in table_schema.column_schemas() {
1028 if let Some(set_default) = set_defaults
1029 .iter()
1030 .find(|s| s.column_name == column_schema.name)
1031 {
1032 let new_column_schema = column_schema.clone();
1033 let new_column_schema = new_column_schema
1034 .with_default_constraint(set_default.default_constraint.clone())
1035 .with_context(|_| error::SchemaBuildSnafu {
1036 msg: format!("Table {table_name} cannot set default values"),
1037 })?;
1038 columns.push(new_column_schema);
1039 } else {
1040 columns.push(column_schema.clone());
1041 }
1042 }
1043
1044 let mut builder = SchemaBuilder::try_from_columns(columns)
1045 .with_context(|_| error::SchemaBuildSnafu {
1046 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1047 })?
1048 .version(table_schema.version() + 1);
1050 for (k, v) in table_schema.metadata().iter() {
1051 builder = builder.add_metadata(k, v);
1052 }
1053 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1054 msg: format!("Table {table_name} cannot set default values"),
1055 })?;
1056
1057 let _ = meta_builder.schema(Arc::new(new_schema));
1058
1059 Ok(meta_builder)
1060 }
1061}
1062
1063#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1064#[builder(pattern = "owned")]
1065pub struct TableInfo {
1066 #[builder(default, setter(into))]
1068 pub ident: TableIdent,
1069 #[builder(setter(into))]
1071 pub name: String,
1072 #[builder(default, setter(into))]
1074 pub desc: Option<String>,
1075 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1076 pub catalog_name: String,
1077 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1078 pub schema_name: String,
1079 pub meta: TableMeta,
1080 #[builder(default = "TableType::Base")]
1081 pub table_type: TableType,
1082}
1083
1084pub type TableInfoRef = Arc<TableInfo>;
1085
1086impl TableInfo {
1087 pub fn table_id(&self) -> TableId {
1088 self.ident.table_id
1089 }
1090
1091 pub fn region_ids(&self) -> Vec<RegionId> {
1092 self.meta
1093 .region_numbers
1094 .iter()
1095 .map(|id| RegionId::new(self.table_id(), *id))
1096 .collect()
1097 }
1098 pub fn full_table_name(&self) -> String {
1100 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1101 }
1102
1103 pub fn get_db_string(&self) -> String {
1104 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1105 }
1106
1107 pub fn is_physical_table(&self) -> bool {
1109 self.meta
1110 .options
1111 .extra_options
1112 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1113 }
1114
1115 pub fn is_ttl_instant_table(&self) -> bool {
1117 self.meta
1118 .options
1119 .ttl
1120 .map(|t| t.is_instant())
1121 .unwrap_or(false)
1122 }
1123}
1124
1125impl TableInfoBuilder {
1126 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1127 Self {
1128 name: Some(name.into()),
1129 meta: Some(meta),
1130 ..Default::default()
1131 }
1132 }
1133
1134 pub fn table_id(mut self, id: TableId) -> Self {
1135 let ident = self.ident.get_or_insert_with(TableIdent::default);
1136 ident.table_id = id;
1137 self
1138 }
1139
1140 pub fn table_version(mut self, version: TableVersion) -> Self {
1141 let ident = self.ident.get_or_insert_with(TableIdent::default);
1142 ident.version = version;
1143 self
1144 }
1145}
1146
1147impl TableIdent {
1148 pub fn new(table_id: TableId) -> Self {
1149 Self {
1150 table_id,
1151 version: 0,
1152 }
1153 }
1154}
1155
1156impl From<TableId> for TableIdent {
1157 fn from(table_id: TableId) -> Self {
1158 Self::new(table_id)
1159 }
1160}
1161
1162#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1164pub struct RawTableMeta {
1165 pub schema: RawSchema,
1166 pub primary_key_indices: Vec<usize>,
1169 pub value_indices: Vec<usize>,
1172 pub engine: String,
1174 pub next_column_id: ColumnId,
1177 pub region_numbers: Vec<u32>,
1178 pub options: TableOptions,
1179 pub created_on: DateTime<Utc>,
1180 pub updated_on: DateTime<Utc>,
1181 #[serde(default)]
1183 pub partition_key_indices: Vec<usize>,
1184 #[serde(default)]
1187 pub column_ids: Vec<ColumnId>,
1188}
1189
1190impl<'de> Deserialize<'de> for RawTableMeta {
1191 fn deserialize<D>(
1192 deserializer: D,
1193 ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1194 where
1195 D: Deserializer<'de>,
1196 {
1197 #[derive(Deserialize)]
1198 struct Helper {
1199 schema: RawSchema,
1200 primary_key_indices: Vec<usize>,
1201 value_indices: Vec<usize>,
1202 engine: String,
1203 next_column_id: u32,
1204 region_numbers: Vec<u32>,
1205 options: TableOptions,
1206 created_on: DateTime<Utc>,
1207 updated_on: Option<DateTime<Utc>>,
1208 #[serde(default)]
1209 partition_key_indices: Vec<usize>,
1210 #[serde(default)]
1211 column_ids: Vec<ColumnId>,
1212 }
1213
1214 let h = Helper::deserialize(deserializer)?;
1215 Ok(RawTableMeta {
1216 schema: h.schema,
1217 primary_key_indices: h.primary_key_indices,
1218 value_indices: h.value_indices,
1219 engine: h.engine,
1220 next_column_id: h.next_column_id,
1221 region_numbers: h.region_numbers,
1222 options: h.options,
1223 created_on: h.created_on,
1224 updated_on: h.updated_on.unwrap_or(h.created_on),
1225 partition_key_indices: h.partition_key_indices,
1226 column_ids: h.column_ids,
1227 })
1228 }
1229}
1230
1231impl From<TableMeta> for RawTableMeta {
1232 fn from(meta: TableMeta) -> RawTableMeta {
1233 RawTableMeta {
1234 schema: RawSchema::from(&*meta.schema),
1235 primary_key_indices: meta.primary_key_indices,
1236 value_indices: meta.value_indices,
1237 engine: meta.engine,
1238 next_column_id: meta.next_column_id,
1239 region_numbers: meta.region_numbers,
1240 options: meta.options,
1241 created_on: meta.created_on,
1242 updated_on: meta.updated_on,
1243 partition_key_indices: meta.partition_key_indices,
1244 column_ids: meta.column_ids,
1245 }
1246 }
1247}
1248
1249impl TryFrom<RawTableMeta> for TableMeta {
1250 type Error = ConvertError;
1251
1252 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1253 Ok(TableMeta {
1254 schema: Arc::new(Schema::try_from(raw.schema)?),
1255 primary_key_indices: raw.primary_key_indices,
1256 value_indices: raw.value_indices,
1257 engine: raw.engine,
1258 region_numbers: raw.region_numbers,
1259 next_column_id: raw.next_column_id,
1260 options: raw.options,
1261 created_on: raw.created_on,
1262 updated_on: raw.updated_on,
1263 partition_key_indices: raw.partition_key_indices,
1264 column_ids: raw.column_ids,
1265 })
1266 }
1267}
1268
1269#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1271pub struct RawTableInfo {
1272 pub ident: TableIdent,
1273 pub name: String,
1274 pub desc: Option<String>,
1275 pub catalog_name: String,
1276 pub schema_name: String,
1277 pub meta: RawTableMeta,
1278 pub table_type: TableType,
1279}
1280
1281impl RawTableInfo {
1282 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1286 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1287 None
1288 } else {
1289 Some(
1290 self.meta
1291 .column_ids
1292 .iter()
1293 .enumerate()
1294 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1295 .collect(),
1296 )
1297 }
1298 }
1299
1300 pub fn sort_columns(&mut self) {
1302 let column_schemas = &self.meta.schema.column_schemas;
1303 let primary_keys = self
1304 .meta
1305 .primary_key_indices
1306 .iter()
1307 .map(|index| column_schemas[*index].name.clone())
1308 .collect::<HashSet<_>>();
1309
1310 let name_to_ids = self.name_to_ids().unwrap_or_default();
1311 self.meta
1312 .schema
1313 .column_schemas
1314 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1315
1316 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1318 let mut timestamp_index = None;
1319 let mut value_indices =
1320 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1321 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1322 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1323 if primary_keys.contains(&column_schema.name) {
1324 primary_key_indices.push(index);
1325 } else if column_schema.is_time_index() {
1326 value_indices.push(index);
1327 timestamp_index = Some(index);
1328 } else {
1329 value_indices.push(index);
1330 }
1331 if let Some(id) = name_to_ids.get(&column_schema.name) {
1332 column_ids.push(*id);
1333 }
1334 }
1335
1336 self.meta.schema.timestamp_index = timestamp_index;
1338 self.meta.primary_key_indices = primary_key_indices;
1339 self.meta.value_indices = value_indices;
1340 self.meta.column_ids = column_ids;
1341 }
1342
1343 pub fn to_region_options(&self) -> HashMap<String, String> {
1347 HashMap::from(&self.meta.options)
1348 }
1349
1350 pub fn table_ref(&self) -> TableReference<'_> {
1352 TableReference::full(
1353 self.catalog_name.as_str(),
1354 self.schema_name.as_str(),
1355 self.name.as_str(),
1356 )
1357 }
1358}
1359
1360impl From<TableInfo> for RawTableInfo {
1361 fn from(info: TableInfo) -> RawTableInfo {
1362 RawTableInfo {
1363 ident: info.ident,
1364 name: info.name,
1365 desc: info.desc,
1366 catalog_name: info.catalog_name,
1367 schema_name: info.schema_name,
1368 meta: RawTableMeta::from(info.meta),
1369 table_type: info.table_type,
1370 }
1371 }
1372}
1373
1374impl TryFrom<RawTableInfo> for TableInfo {
1375 type Error = ConvertError;
1376
1377 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1378 Ok(TableInfo {
1379 ident: raw.ident,
1380 name: raw.name,
1381 desc: raw.desc,
1382 catalog_name: raw.catalog_name,
1383 schema_name: raw.schema_name,
1384 meta: TableMeta::try_from(raw.meta)?,
1385 table_type: raw.table_type,
1386 })
1387 }
1388}
1389
1390fn set_column_fulltext_options(
1399 column_schema: &mut ColumnSchema,
1400 column_name: &str,
1401 options: &FulltextOptions,
1402 current_options: Option<FulltextOptions>,
1403) -> Result<()> {
1404 if let Some(current_options) = current_options {
1405 ensure!(
1406 current_options.analyzer == options.analyzer
1407 && current_options.case_sensitive == options.case_sensitive,
1408 error::InvalidColumnOptionSnafu {
1409 column_name,
1410 msg: format!(
1411 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1412 current_options.analyzer, current_options.case_sensitive
1413 ),
1414 }
1415 );
1416 }
1417
1418 column_schema
1419 .set_fulltext_options(options)
1420 .context(error::SetFulltextOptionsSnafu { column_name })?;
1421
1422 Ok(())
1423}
1424
1425fn unset_column_fulltext_options(
1426 column_schema: &mut ColumnSchema,
1427 column_name: &str,
1428 current_options: Option<FulltextOptions>,
1429) -> Result<()> {
1430 ensure!(
1431 current_options
1432 .as_ref()
1433 .is_some_and(|options| options.enable),
1434 error::InvalidColumnOptionSnafu {
1435 column_name,
1436 msg: "FULLTEXT index already disabled".to_string(),
1437 }
1438 );
1439
1440 let mut options = current_options.unwrap();
1441 options.enable = false;
1442 column_schema
1443 .set_fulltext_options(&options)
1444 .context(error::SetFulltextOptionsSnafu { column_name })?;
1445
1446 Ok(())
1447}
1448
1449fn set_column_skipping_index_options(
1450 column_schema: &mut ColumnSchema,
1451 column_name: &str,
1452 options: &SkippingIndexOptions,
1453) -> Result<()> {
1454 column_schema
1455 .set_skipping_options(options)
1456 .context(error::SetSkippingOptionsSnafu { column_name })?;
1457
1458 Ok(())
1459}
1460
1461fn unset_column_skipping_index_options(
1462 column_schema: &mut ColumnSchema,
1463 column_name: &str,
1464) -> Result<()> {
1465 column_schema
1466 .unset_skipping_options()
1467 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1468 Ok(())
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473 use std::assert_matches::assert_matches;
1474
1475 use common_error::ext::ErrorExt;
1476 use common_error::status_code::StatusCode;
1477 use datatypes::data_type::ConcreteDataType;
1478 use datatypes::schema::{
1479 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1480 };
1481
1482 use super::*;
1483 use crate::Error;
1484
1485 fn new_test_schema() -> Schema {
1487 let column_schemas = vec![
1488 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1489 ColumnSchema::new(
1490 "ts",
1491 ConcreteDataType::timestamp_millisecond_datatype(),
1492 false,
1493 )
1494 .with_time_index(true),
1495 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1496 ];
1497 SchemaBuilder::try_from(column_schemas)
1498 .unwrap()
1499 .version(123)
1500 .build()
1501 .unwrap()
1502 }
1503
1504 #[test]
1505 fn test_raw_convert() {
1506 let schema = Arc::new(new_test_schema());
1507 let meta = TableMetaBuilder::empty()
1508 .schema(schema)
1509 .primary_key_indices(vec![0])
1510 .engine("engine")
1511 .next_column_id(3)
1512 .build()
1513 .unwrap();
1514 let info = TableInfoBuilder::default()
1515 .table_id(10)
1516 .table_version(5)
1517 .name("mytable")
1518 .meta(meta)
1519 .build()
1520 .unwrap();
1521
1522 let raw = RawTableInfo::from(info.clone());
1523 let info_new = TableInfo::try_from(raw).unwrap();
1524
1525 assert_eq!(info, info_new);
1526 }
1527
1528 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1529 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1530 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1531 let alter_kind = AlterKind::AddColumns {
1532 columns: vec![
1533 AddColumnRequest {
1534 column_schema: new_tag,
1535 is_key: true,
1536 location: None,
1537 add_if_not_exists: false,
1538 },
1539 AddColumnRequest {
1540 column_schema: new_field,
1541 is_key: false,
1542 location: None,
1543 add_if_not_exists: false,
1544 },
1545 ],
1546 };
1547
1548 let builder = meta
1549 .builder_with_alter_kind("my_table", &alter_kind)
1550 .unwrap();
1551 builder.build().unwrap()
1552 }
1553
1554 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1555 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1556 let new_field = ColumnSchema::new(
1557 "my_field_after_ts",
1558 ConcreteDataType::string_datatype(),
1559 true,
1560 );
1561 let yet_another_field = ColumnSchema::new(
1562 "yet_another_field_after_ts",
1563 ConcreteDataType::int64_datatype(),
1564 true,
1565 );
1566 let alter_kind = AlterKind::AddColumns {
1567 columns: vec![
1568 AddColumnRequest {
1569 column_schema: new_tag,
1570 is_key: true,
1571 location: Some(AddColumnLocation::First),
1572 add_if_not_exists: false,
1573 },
1574 AddColumnRequest {
1575 column_schema: new_field,
1576 is_key: false,
1577 location: Some(AddColumnLocation::After {
1578 column_name: "ts".to_string(),
1579 }),
1580 add_if_not_exists: false,
1581 },
1582 AddColumnRequest {
1583 column_schema: yet_another_field,
1584 is_key: true,
1585 location: Some(AddColumnLocation::After {
1586 column_name: "ts".to_string(),
1587 }),
1588 add_if_not_exists: false,
1589 },
1590 ],
1591 };
1592
1593 let builder = meta
1594 .builder_with_alter_kind("my_table", &alter_kind)
1595 .unwrap();
1596 builder.build().unwrap()
1597 }
1598
1599 #[test]
1600 fn test_add_columns() {
1601 let schema = Arc::new(new_test_schema());
1602 let meta = TableMetaBuilder::empty()
1603 .schema(schema)
1604 .primary_key_indices(vec![0])
1605 .engine("engine")
1606 .next_column_id(3)
1607 .build()
1608 .unwrap();
1609
1610 let new_meta = add_columns_to_meta(&meta);
1611 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1612
1613 let names: Vec<String> = new_meta
1614 .schema
1615 .column_schemas()
1616 .iter()
1617 .map(|column_schema| column_schema.name.clone())
1618 .collect();
1619 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1620 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1621 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1622 }
1623
1624 #[test]
1625 fn test_add_columns_multiple_times() {
1626 let schema = Arc::new(new_test_schema());
1627 let meta = TableMetaBuilder::empty()
1628 .schema(schema)
1629 .primary_key_indices(vec![0])
1630 .engine("engine")
1631 .next_column_id(3)
1632 .build()
1633 .unwrap();
1634
1635 let alter_kind = AlterKind::AddColumns {
1636 columns: vec![
1637 AddColumnRequest {
1638 column_schema: ColumnSchema::new(
1639 "col3",
1640 ConcreteDataType::int32_datatype(),
1641 true,
1642 ),
1643 is_key: true,
1644 location: None,
1645 add_if_not_exists: true,
1646 },
1647 AddColumnRequest {
1648 column_schema: ColumnSchema::new(
1649 "col3",
1650 ConcreteDataType::int32_datatype(),
1651 true,
1652 ),
1653 is_key: true,
1654 location: None,
1655 add_if_not_exists: true,
1656 },
1657 ],
1658 };
1659 let err = meta
1660 .builder_with_alter_kind("my_table", &alter_kind)
1661 .err()
1662 .unwrap();
1663 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1664 }
1665
1666 #[test]
1667 fn test_remove_columns() {
1668 let schema = Arc::new(new_test_schema());
1669 let meta = TableMetaBuilder::empty()
1670 .schema(schema.clone())
1671 .primary_key_indices(vec![0])
1672 .engine("engine")
1673 .next_column_id(3)
1674 .build()
1675 .unwrap();
1676 let meta = add_columns_to_meta(&meta);
1678
1679 let alter_kind = AlterKind::DropColumns {
1680 names: vec![String::from("col2"), String::from("my_field")],
1681 };
1682 let new_meta = meta
1683 .builder_with_alter_kind("my_table", &alter_kind)
1684 .unwrap()
1685 .build()
1686 .unwrap();
1687
1688 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1689
1690 let names: Vec<String> = new_meta
1691 .schema
1692 .column_schemas()
1693 .iter()
1694 .map(|column_schema| column_schema.name.clone())
1695 .collect();
1696 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1697 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1698 assert_eq!(&[1], &new_meta.value_indices[..]);
1699 assert_eq!(
1700 schema.timestamp_column(),
1701 new_meta.schema.timestamp_column()
1702 );
1703 }
1704
1705 #[test]
1706 fn test_remove_multiple_columns_before_timestamp() {
1707 let column_schemas = vec![
1708 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1709 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1710 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1711 ColumnSchema::new(
1712 "ts",
1713 ConcreteDataType::timestamp_millisecond_datatype(),
1714 false,
1715 )
1716 .with_time_index(true),
1717 ];
1718 let schema = Arc::new(
1719 SchemaBuilder::try_from(column_schemas)
1720 .unwrap()
1721 .version(123)
1722 .build()
1723 .unwrap(),
1724 );
1725 let meta = TableMetaBuilder::empty()
1726 .schema(schema.clone())
1727 .primary_key_indices(vec![1])
1728 .engine("engine")
1729 .next_column_id(4)
1730 .build()
1731 .unwrap();
1732
1733 let alter_kind = AlterKind::DropColumns {
1735 names: vec![String::from("col3"), String::from("col1")],
1736 };
1737 let new_meta = meta
1738 .builder_with_alter_kind("my_table", &alter_kind)
1739 .unwrap()
1740 .build()
1741 .unwrap();
1742
1743 let names: Vec<String> = new_meta
1744 .schema
1745 .column_schemas()
1746 .iter()
1747 .map(|column_schema| column_schema.name.clone())
1748 .collect();
1749 assert_eq!(&["col2", "ts"], &names[..]);
1750 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1751 assert_eq!(&[1], &new_meta.value_indices[..]);
1752 assert_eq!(
1753 schema.timestamp_column(),
1754 new_meta.schema.timestamp_column()
1755 );
1756 }
1757
1758 #[test]
1759 fn test_add_existing_column() {
1760 let schema = Arc::new(new_test_schema());
1761 let meta = TableMetaBuilder::empty()
1762 .schema(schema)
1763 .primary_key_indices(vec![0])
1764 .engine("engine")
1765 .next_column_id(3)
1766 .build()
1767 .unwrap();
1768
1769 let alter_kind = AlterKind::AddColumns {
1770 columns: vec![AddColumnRequest {
1771 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1772 is_key: false,
1773 location: None,
1774 add_if_not_exists: false,
1775 }],
1776 };
1777
1778 let err = meta
1779 .builder_with_alter_kind("my_table", &alter_kind)
1780 .err()
1781 .unwrap();
1782 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1783
1784 let alter_kind = AlterKind::AddColumns {
1786 columns: vec![AddColumnRequest {
1787 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1788 is_key: true,
1789 location: None,
1790 add_if_not_exists: true,
1791 }],
1792 };
1793 let new_meta = meta
1794 .builder_with_alter_kind("my_table", &alter_kind)
1795 .unwrap()
1796 .build()
1797 .unwrap();
1798 assert_eq!(
1799 meta.schema.column_schemas(),
1800 new_meta.schema.column_schemas()
1801 );
1802 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1803 }
1804
1805 #[test]
1806 fn test_add_different_type_column() {
1807 let schema = Arc::new(new_test_schema());
1808 let meta = TableMetaBuilder::empty()
1809 .schema(schema)
1810 .primary_key_indices(vec![0])
1811 .engine("engine")
1812 .next_column_id(3)
1813 .build()
1814 .unwrap();
1815
1816 let alter_kind = AlterKind::AddColumns {
1818 columns: vec![AddColumnRequest {
1819 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1820 is_key: false,
1821 location: None,
1822 add_if_not_exists: true,
1823 }],
1824 };
1825 let err = meta
1826 .builder_with_alter_kind("my_table", &alter_kind)
1827 .err()
1828 .unwrap();
1829 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1830 }
1831
1832 #[test]
1833 fn test_add_invalid_column() {
1834 let schema = Arc::new(new_test_schema());
1835 let meta = TableMetaBuilder::empty()
1836 .schema(schema)
1837 .primary_key_indices(vec![0])
1838 .engine("engine")
1839 .next_column_id(3)
1840 .build()
1841 .unwrap();
1842
1843 let alter_kind = AlterKind::AddColumns {
1845 columns: vec![AddColumnRequest {
1846 column_schema: ColumnSchema::new(
1847 "weny",
1848 ConcreteDataType::string_datatype(),
1849 false,
1850 ),
1851 is_key: false,
1852 location: None,
1853 add_if_not_exists: false,
1854 }],
1855 };
1856
1857 let err = meta
1858 .builder_with_alter_kind("my_table", &alter_kind)
1859 .err()
1860 .unwrap();
1861 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1862 }
1863
1864 #[test]
1865 fn test_remove_unknown_column() {
1866 let schema = Arc::new(new_test_schema());
1867 let meta = TableMetaBuilder::empty()
1868 .schema(schema)
1869 .primary_key_indices(vec![0])
1870 .engine("engine")
1871 .next_column_id(3)
1872 .build()
1873 .unwrap();
1874
1875 let alter_kind = AlterKind::DropColumns {
1876 names: vec![String::from("unknown")],
1877 };
1878
1879 let err = meta
1880 .builder_with_alter_kind("my_table", &alter_kind)
1881 .err()
1882 .unwrap();
1883 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1884 }
1885
1886 #[test]
1887 fn test_change_unknown_column_data_type() {
1888 let schema = Arc::new(new_test_schema());
1889 let meta = TableMetaBuilder::empty()
1890 .schema(schema)
1891 .primary_key_indices(vec![0])
1892 .engine("engine")
1893 .next_column_id(3)
1894 .build()
1895 .unwrap();
1896
1897 let alter_kind = AlterKind::ModifyColumnTypes {
1898 columns: vec![ModifyColumnTypeRequest {
1899 column_name: "unknown".to_string(),
1900 target_type: ConcreteDataType::string_datatype(),
1901 }],
1902 };
1903
1904 let err = meta
1905 .builder_with_alter_kind("my_table", &alter_kind)
1906 .err()
1907 .unwrap();
1908 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1909 }
1910
1911 #[test]
1912 fn test_remove_key_column() {
1913 let schema = Arc::new(new_test_schema());
1914 let meta = TableMetaBuilder::empty()
1915 .schema(schema)
1916 .primary_key_indices(vec![0])
1917 .engine("engine")
1918 .next_column_id(3)
1919 .build()
1920 .unwrap();
1921
1922 let alter_kind = AlterKind::DropColumns {
1924 names: vec![String::from("col1")],
1925 };
1926
1927 let err = meta
1928 .builder_with_alter_kind("my_table", &alter_kind)
1929 .err()
1930 .unwrap();
1931 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1932
1933 let alter_kind = AlterKind::DropColumns {
1935 names: vec![String::from("ts")],
1936 };
1937
1938 let err = meta
1939 .builder_with_alter_kind("my_table", &alter_kind)
1940 .err()
1941 .unwrap();
1942 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1943 }
1944
1945 #[test]
1946 fn test_remove_partition_column() {
1947 let schema = Arc::new(new_test_schema());
1948 let meta = TableMetaBuilder::empty()
1949 .schema(schema)
1950 .primary_key_indices(vec![])
1951 .partition_key_indices(vec![0])
1952 .engine("engine")
1953 .next_column_id(3)
1954 .build()
1955 .unwrap();
1956 let alter_kind = AlterKind::DropColumns {
1958 names: vec![String::from("col1")],
1959 };
1960
1961 let err = meta
1962 .builder_with_alter_kind("my_table", &alter_kind)
1963 .err()
1964 .unwrap();
1965 assert_matches!(err, Error::RemovePartitionColumn { .. });
1966 }
1967
1968 #[test]
1969 fn test_change_key_column_data_type() {
1970 let schema = Arc::new(new_test_schema());
1971 let meta = TableMetaBuilder::empty()
1972 .schema(schema)
1973 .primary_key_indices(vec![0])
1974 .engine("engine")
1975 .next_column_id(3)
1976 .build()
1977 .unwrap();
1978
1979 let alter_kind = AlterKind::ModifyColumnTypes {
1981 columns: vec![ModifyColumnTypeRequest {
1982 column_name: "col1".to_string(),
1983 target_type: ConcreteDataType::string_datatype(),
1984 }],
1985 };
1986
1987 let err = meta
1988 .builder_with_alter_kind("my_table", &alter_kind)
1989 .err()
1990 .unwrap();
1991 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1992
1993 let alter_kind = AlterKind::ModifyColumnTypes {
1995 columns: vec![ModifyColumnTypeRequest {
1996 column_name: "ts".to_string(),
1997 target_type: ConcreteDataType::string_datatype(),
1998 }],
1999 };
2000
2001 let err = meta
2002 .builder_with_alter_kind("my_table", &alter_kind)
2003 .err()
2004 .unwrap();
2005 assert_eq!(StatusCode::InvalidArguments, err.status_code());
2006 }
2007
2008 #[test]
2009 fn test_alloc_new_column() {
2010 let schema = Arc::new(new_test_schema());
2011 let mut meta = TableMetaBuilder::empty()
2012 .schema(schema)
2013 .primary_key_indices(vec![0])
2014 .engine("engine")
2015 .next_column_id(3)
2016 .build()
2017 .unwrap();
2018 assert_eq!(3, meta.next_column_id);
2019
2020 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2021 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2022
2023 assert_eq!(4, meta.next_column_id);
2024 assert_eq!(column_schema.name, desc.name);
2025 }
2026
2027 #[test]
2028 fn test_add_columns_with_location() {
2029 let schema = Arc::new(new_test_schema());
2030 let meta = TableMetaBuilder::empty()
2031 .schema(schema)
2032 .primary_key_indices(vec![0])
2033 .partition_key_indices(vec![0, 2])
2035 .engine("engine")
2036 .next_column_id(3)
2037 .build()
2038 .unwrap();
2039
2040 let new_meta = add_columns_to_meta_with_location(&meta);
2041 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2042
2043 let names: Vec<String> = new_meta
2044 .schema
2045 .column_schemas()
2046 .iter()
2047 .map(|column_schema| column_schema.name.clone())
2048 .collect();
2049 assert_eq!(
2050 &[
2051 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
2058 &names[..]
2059 );
2060 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2061 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2062 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2063 }
2064
2065 #[test]
2066 fn test_modify_column_fulltext_options() {
2067 let schema = Arc::new(new_test_schema());
2068 let meta = TableMetaBuilder::empty()
2069 .schema(schema)
2070 .primary_key_indices(vec![0])
2071 .engine("engine")
2072 .next_column_id(3)
2073 .build()
2074 .unwrap();
2075
2076 let alter_kind = AlterKind::SetIndexes {
2077 options: vec![SetIndexOption::Fulltext {
2078 column_name: "col1".to_string(),
2079 options: FulltextOptions::default(),
2080 }],
2081 };
2082 let err = meta
2083 .builder_with_alter_kind("my_table", &alter_kind)
2084 .err()
2085 .unwrap();
2086 assert_eq!(
2087 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2088 err.to_string()
2089 );
2090
2091 let new_meta = add_columns_to_meta_with_location(&meta);
2093 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2094
2095 let alter_kind = AlterKind::SetIndexes {
2096 options: vec![SetIndexOption::Fulltext {
2097 column_name: "my_tag_first".to_string(),
2098 options: FulltextOptions::new_unchecked(
2099 true,
2100 FulltextAnalyzer::Chinese,
2101 true,
2102 FulltextBackend::Bloom,
2103 1000,
2104 0.01,
2105 ),
2106 }],
2107 };
2108 let new_meta = new_meta
2109 .builder_with_alter_kind("my_table", &alter_kind)
2110 .unwrap()
2111 .build()
2112 .unwrap();
2113 let column_schema = new_meta
2114 .schema
2115 .column_schema_by_name("my_tag_first")
2116 .unwrap();
2117 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2118 assert!(fulltext_options.enable);
2119 assert_eq!(
2120 datatypes::schema::FulltextAnalyzer::Chinese,
2121 fulltext_options.analyzer
2122 );
2123 assert!(fulltext_options.case_sensitive);
2124
2125 let alter_kind = AlterKind::UnsetIndexes {
2126 options: vec![UnsetIndexOption::Fulltext {
2127 column_name: "my_tag_first".to_string(),
2128 }],
2129 };
2130 let new_meta = new_meta
2131 .builder_with_alter_kind("my_table", &alter_kind)
2132 .unwrap()
2133 .build()
2134 .unwrap();
2135 let column_schema = new_meta
2136 .schema
2137 .column_schema_by_name("my_tag_first")
2138 .unwrap();
2139 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2140 assert!(!fulltext_options.enable);
2141 }
2142}