1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105 fn from(t: TableType) -> datafusion::datasource::TableType {
106 match t {
107 TableType::Base => datafusion::datasource::TableType::Base,
108 TableType::View => datafusion::datasource::TableType::View,
109 TableType::Temporary => datafusion::datasource::TableType::Temporary,
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117 pub table_id: TableId,
119 pub version: TableVersion,
122}
123
124#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130 pub schema: SchemaRef,
131 pub primary_key_indices: Vec<usize>,
134 #[builder(default = "self.default_value_indices()?")]
135 pub value_indices: Vec<usize>,
136 #[builder(default, setter(into))]
137 pub engine: String,
138 #[builder(default, setter(into))]
139 pub region_numbers: Vec<u32>,
140 pub next_column_id: ColumnId,
141 #[builder(default)]
143 pub options: TableOptions,
144 #[builder(default = "Utc::now()")]
145 pub created_on: DateTime<Utc>,
146 #[builder(default = "self.default_updated_on()")]
147 pub updated_on: DateTime<Utc>,
148 #[builder(default = "Vec::new()")]
149 pub partition_key_indices: Vec<usize>,
150 #[builder(default = "Vec::new()")]
151 pub column_ids: Vec<ColumnId>,
152}
153
154impl TableMetaBuilder {
155 #[cfg(any(test, feature = "testing"))]
157 pub fn empty() -> Self {
158 Self {
159 schema: None,
160 primary_key_indices: None,
161 value_indices: None,
162 engine: None,
163 region_numbers: None,
164 next_column_id: None,
165 options: None,
166 created_on: None,
167 updated_on: None,
168 partition_key_indices: None,
169 column_ids: None,
170 }
171 }
172}
173
174impl TableMetaBuilder {
175 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
176 match (&self.primary_key_indices, &self.schema) {
177 (Some(v), Some(schema)) => {
178 let column_schemas = schema.column_schemas();
179 Ok((0..column_schemas.len())
180 .filter(|idx| !v.contains(idx))
181 .collect())
182 }
183 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
184 }
185 }
186
187 fn default_updated_on(&self) -> DateTime<Utc> {
188 self.created_on.unwrap_or_default()
189 }
190
191 pub fn new_external_table() -> Self {
192 Self {
193 schema: None,
194 primary_key_indices: Some(Vec::new()),
195 value_indices: Some(Vec::new()),
196 engine: None,
197 region_numbers: Some(Vec::new()),
198 next_column_id: Some(0),
199 options: None,
200 created_on: None,
201 updated_on: None,
202 partition_key_indices: None,
203 column_ids: None,
204 }
205 }
206}
207
208struct SplitResult<'a> {
210 columns_at_first: Vec<&'a AddColumnRequest>,
212 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
214 columns_at_last: Vec<&'a AddColumnRequest>,
216 column_names: Vec<String>,
218}
219
220impl TableMeta {
221 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
222 let columns_schemas = &self.schema.column_schemas();
223 self.primary_key_indices
224 .iter()
225 .map(|idx| &columns_schemas[*idx].name)
226 }
227
228 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
229 let columns_schemas = self.schema.column_schemas();
231 let primary_key_indices = &self.primary_key_indices;
232 columns_schemas
233 .iter()
234 .enumerate()
235 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
236 .map(|(_, cs)| &cs.name)
237 }
238
239 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
240 let columns_schemas = &self.schema.column_schemas();
241 self.partition_key_indices
242 .iter()
243 .map(|idx| &columns_schemas[*idx].name)
244 }
245
246 pub fn builder_with_alter_kind(
250 &self,
251 table_name: &str,
252 alter_kind: &AlterKind,
253 ) -> Result<TableMetaBuilder> {
254 let mut builder = match alter_kind {
255 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
256 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
257 AlterKind::ModifyColumnTypes { columns } => {
258 self.modify_column_types(table_name, columns)
259 }
260 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
262 AlterKind::SetTableOptions { options } => self.set_table_options(options),
263 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
264 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
265 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
266 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
267 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
268 }?;
269 let _ = builder.updated_on(Utc::now());
270 Ok(builder)
271 }
272
273 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
275 let mut new_options = self.options.clone();
276
277 for request in requests {
278 match request {
279 SetRegionOption::Ttl(new_ttl) => {
280 new_options.ttl = *new_ttl;
281 }
282 SetRegionOption::Twsc(key, value) => {
283 if !value.is_empty() {
284 new_options.extra_options.insert(key.clone(), value.clone());
285 new_options.extra_options.insert(
287 COMPACTION_TYPE.to_string(),
288 COMPACTION_TYPE_TWCS.to_string(),
289 );
290 } else {
291 new_options.extra_options.remove(key.as_str());
293 }
294 }
295 }
296 }
297 let mut builder = self.new_meta_builder();
298 builder.options(new_options);
299
300 Ok(builder)
301 }
302
303 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
304 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
305 self.set_table_options(&requests)
306 }
307
308 fn set_indexes(
309 &self,
310 table_name: &str,
311 requests: &[SetIndexOption],
312 ) -> Result<TableMetaBuilder> {
313 let table_schema = &self.schema;
314 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
315 for request in requests {
316 let column_name = request.column_name();
317 table_schema
318 .column_index_by_name(column_name)
319 .with_context(|| error::ColumnNotExistsSnafu {
320 column_name,
321 table_name,
322 })?;
323 set_index_options
324 .entry(column_name)
325 .or_default()
326 .push(request);
327 }
328
329 let mut meta_builder = self.new_meta_builder();
330 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
331 for mut column in table_schema.column_schemas().iter().cloned() {
332 if let Some(request) = set_index_options.get(column.name.as_str()) {
333 for request in request {
334 self.set_index(&mut column, request)?;
335 }
336 }
337 columns.push(column);
338 }
339
340 let mut builder = SchemaBuilder::try_from_columns(columns)
341 .with_context(|_| error::SchemaBuildSnafu {
342 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
343 })?
344 .version(table_schema.version() + 1);
345
346 for (k, v) in table_schema.metadata().iter() {
347 builder = builder.add_metadata(k, v);
348 }
349
350 let new_schema = builder.build().with_context(|_| {
351 let column_names = requests
352 .iter()
353 .map(|request| request.column_name())
354 .collect::<Vec<_>>();
355 error::SchemaBuildSnafu {
356 msg: format!(
357 "Table {table_name} cannot set index options with columns {column_names:?}",
358 ),
359 }
360 })?;
361 let _ = meta_builder
362 .schema(Arc::new(new_schema))
363 .primary_key_indices(self.primary_key_indices.clone());
364
365 Ok(meta_builder)
366 }
367
368 fn unset_indexes(
369 &self,
370 table_name: &str,
371 requests: &[UnsetIndexOption],
372 ) -> Result<TableMetaBuilder> {
373 let table_schema = &self.schema;
374 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
375 for request in requests {
376 let column_name = request.column_name();
377 table_schema
378 .column_index_by_name(column_name)
379 .with_context(|| error::ColumnNotExistsSnafu {
380 column_name,
381 table_name,
382 })?;
383 set_index_options
384 .entry(column_name)
385 .or_default()
386 .push(request);
387 }
388
389 let mut meta_builder = self.new_meta_builder();
390 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
391 for mut column in table_schema.column_schemas().iter().cloned() {
392 if let Some(request) = set_index_options.get(column.name.as_str()) {
393 for request in request {
394 self.unset_index(&mut column, request)?;
395 }
396 }
397 columns.push(column);
398 }
399
400 let mut builder = SchemaBuilder::try_from_columns(columns)
401 .with_context(|_| error::SchemaBuildSnafu {
402 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
403 })?
404 .version(table_schema.version() + 1);
405
406 for (k, v) in table_schema.metadata().iter() {
407 builder = builder.add_metadata(k, v);
408 }
409
410 let new_schema = builder.build().with_context(|_| {
411 let column_names = requests
412 .iter()
413 .map(|request| request.column_name())
414 .collect::<Vec<_>>();
415 error::SchemaBuildSnafu {
416 msg: format!(
417 "Table {table_name} cannot set index options with columns {column_names:?}",
418 ),
419 }
420 })?;
421 let _ = meta_builder
422 .schema(Arc::new(new_schema))
423 .primary_key_indices(self.primary_key_indices.clone());
424
425 Ok(meta_builder)
426 }
427
428 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
429 match request {
430 SetIndexOption::Fulltext {
431 column_name,
432 options,
433 } => {
434 ensure!(
435 column_schema.data_type.is_string(),
436 error::InvalidColumnOptionSnafu {
437 column_name,
438 msg: "FULLTEXT index only supports string type",
439 }
440 );
441
442 let current_fulltext_options = column_schema
443 .fulltext_options()
444 .context(error::SetFulltextOptionsSnafu { column_name })?;
445 set_column_fulltext_options(
446 column_schema,
447 column_name,
448 options,
449 current_fulltext_options,
450 )?;
451 }
452 SetIndexOption::Inverted { column_name } => {
453 debug_assert_eq!(column_schema.name, *column_name);
454 column_schema.set_inverted_index(true);
455 }
456 SetIndexOption::Skipping {
457 column_name,
458 options,
459 } => {
460 set_column_skipping_index_options(column_schema, column_name, options)?;
461 }
462 }
463
464 Ok(())
465 }
466
467 fn unset_index(
468 &self,
469 column_schema: &mut ColumnSchema,
470 request: &UnsetIndexOption,
471 ) -> Result<()> {
472 match request {
473 UnsetIndexOption::Fulltext { column_name } => {
474 let current_fulltext_options = column_schema
475 .fulltext_options()
476 .context(error::SetFulltextOptionsSnafu { column_name })?;
477 unset_column_fulltext_options(
478 column_schema,
479 column_name,
480 current_fulltext_options.clone(),
481 )?
482 }
483 UnsetIndexOption::Inverted { .. } => {
484 column_schema.set_inverted_index(false);
485 }
486 UnsetIndexOption::Skipping { column_name } => {
487 unset_column_skipping_index_options(column_schema, column_name)?;
488 }
489 }
490
491 Ok(())
492 }
493
494 pub fn alloc_new_column(
499 &mut self,
500 table_name: &str,
501 new_column: &ColumnSchema,
502 ) -> Result<ColumnDescriptor> {
503 let desc = ColumnDescriptorBuilder::new(
504 self.next_column_id as ColumnId,
505 &new_column.name,
506 new_column.data_type.clone(),
507 )
508 .is_nullable(new_column.is_nullable())
509 .default_constraint(new_column.default_constraint().cloned())
510 .build()
511 .context(error::BuildColumnDescriptorSnafu {
512 table_name,
513 column_name: &new_column.name,
514 })?;
515
516 self.next_column_id += 1;
518
519 Ok(desc)
520 }
521
522 fn new_meta_builder(&self) -> TableMetaBuilder {
524 let mut builder = TableMetaBuilder::from(self);
525 builder.value_indices = None;
527 builder
528 }
529
530 fn add_columns(
532 &self,
533 table_name: &str,
534 requests: &[AddColumnRequest],
535 ) -> Result<TableMetaBuilder> {
536 let table_schema = &self.schema;
537 let mut meta_builder = self.new_meta_builder();
538 let original_primary_key_indices: HashSet<&usize> =
539 self.primary_key_indices.iter().collect();
540
541 let mut names = HashSet::with_capacity(requests.len());
542 let mut new_columns = Vec::with_capacity(requests.len());
543 for col_to_add in requests {
544 if let Some(column_schema) =
545 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
546 {
547 ensure!(
549 col_to_add.add_if_not_exists,
550 error::ColumnExistsSnafu {
551 table_name,
552 column_name: &col_to_add.column_schema.name
553 },
554 );
555
556 ensure!(
558 column_schema.data_type == col_to_add.column_schema.data_type,
559 error::InvalidAlterRequestSnafu {
560 table: table_name,
561 err: format!(
562 "column {} already exists with different type {:?}",
563 col_to_add.column_schema.name, column_schema.data_type,
564 ),
565 }
566 );
567 } else {
568 ensure!(
571 names.insert(&col_to_add.column_schema.name),
572 error::InvalidAlterRequestSnafu {
573 table: table_name,
574 err: format!(
575 "add column {} more than once",
576 col_to_add.column_schema.name
577 ),
578 }
579 );
580
581 ensure!(
582 col_to_add.column_schema.is_nullable()
583 || col_to_add.column_schema.default_constraint().is_some(),
584 error::InvalidAlterRequestSnafu {
585 table: table_name,
586 err: format!(
587 "no default value for column {}",
588 col_to_add.column_schema.name
589 ),
590 },
591 );
592
593 new_columns.push(col_to_add.clone());
594 }
595 }
596 let requests = &new_columns[..];
597
598 let SplitResult {
599 columns_at_first,
600 columns_at_after,
601 columns_at_last,
602 column_names,
603 } = self.split_requests_by_column_location(table_name, requests)?;
604 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
605 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
606 columns_at_first.iter().rev().for_each(|request| {
608 if request.is_key {
609 primary_key_indices.push(columns.len());
611 }
612 columns.push(request.column_schema.clone());
613 });
614 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
616 if original_primary_key_indices.contains(&index) {
617 primary_key_indices.push(columns.len());
618 }
619 columns.push(column_schema.clone());
620 if let Some(requests) = columns_at_after.get(&column_schema.name) {
621 requests.iter().rev().for_each(|request| {
622 if request.is_key {
623 primary_key_indices.push(columns.len());
625 }
626 columns.push(request.column_schema.clone());
627 });
628 }
629 }
630 columns_at_last.iter().for_each(|request| {
632 if request.is_key {
633 primary_key_indices.push(columns.len());
635 }
636 columns.push(request.column_schema.clone());
637 });
638
639 let mut builder = SchemaBuilder::try_from(columns)
640 .with_context(|_| error::SchemaBuildSnafu {
641 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
642 })?
643 .version(table_schema.version() + 1);
645 for (k, v) in table_schema.metadata().iter() {
646 builder = builder.add_metadata(k, v);
647 }
648 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
649 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
650 })?;
651
652 let partition_key_indices = self
653 .partition_key_indices
654 .iter()
655 .map(|idx| table_schema.column_name_by_index(*idx))
656 .map(|name| new_schema.column_index_by_name(name).unwrap())
658 .collect();
659
660 let _ = meta_builder
662 .schema(Arc::new(new_schema))
663 .primary_key_indices(primary_key_indices)
664 .partition_key_indices(partition_key_indices);
665
666 Ok(meta_builder)
667 }
668
669 fn remove_columns(
670 &self,
671 table_name: &str,
672 column_names: &[String],
673 ) -> Result<TableMetaBuilder> {
674 let table_schema = &self.schema;
675 let column_names: HashSet<_> = column_names.iter().collect();
676 let mut meta_builder = self.new_meta_builder();
677
678 let timestamp_index = table_schema.timestamp_index();
679 for column_name in &column_names {
681 if let Some(index) = table_schema.column_index_by_name(column_name) {
682 ensure!(
685 !self.primary_key_indices.contains(&index),
686 error::RemoveColumnInIndexSnafu {
687 column_name: *column_name,
688 table_name,
689 }
690 );
691
692 ensure!(
693 !self.partition_key_indices.contains(&index),
694 error::RemovePartitionColumnSnafu {
695 column_name: *column_name,
696 table_name,
697 }
698 );
699
700 if let Some(ts_index) = timestamp_index {
701 ensure!(
703 index != ts_index,
704 error::RemoveColumnInIndexSnafu {
705 column_name: table_schema.column_name_by_index(ts_index),
706 table_name,
707 }
708 );
709 }
710 } else {
711 return error::ColumnNotExistsSnafu {
712 column_name: *column_name,
713 table_name,
714 }
715 .fail()?;
716 }
717 }
718
719 let columns: Vec<_> = table_schema
721 .column_schemas()
722 .iter()
723 .filter(|column_schema| !column_names.contains(&column_schema.name))
724 .cloned()
725 .collect();
726
727 let mut builder = SchemaBuilder::try_from_columns(columns)
728 .with_context(|_| error::SchemaBuildSnafu {
729 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
730 })?
731 .version(table_schema.version() + 1);
733 for (k, v) in table_schema.metadata().iter() {
734 builder = builder.add_metadata(k, v);
735 }
736 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
737 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
738 })?;
739
740 let primary_key_indices = self
742 .primary_key_indices
743 .iter()
744 .map(|idx| table_schema.column_name_by_index(*idx))
745 .map(|name| new_schema.column_index_by_name(name).unwrap())
747 .collect();
748
749 let partition_key_indices = self
750 .partition_key_indices
751 .iter()
752 .map(|idx| table_schema.column_name_by_index(*idx))
753 .map(|name| new_schema.column_index_by_name(name).unwrap())
755 .collect();
756
757 let _ = meta_builder
758 .schema(Arc::new(new_schema))
759 .primary_key_indices(primary_key_indices)
760 .partition_key_indices(partition_key_indices);
761
762 Ok(meta_builder)
763 }
764
765 fn modify_column_types(
766 &self,
767 table_name: &str,
768 requests: &[ModifyColumnTypeRequest],
769 ) -> Result<TableMetaBuilder> {
770 let table_schema = &self.schema;
771 let mut meta_builder = self.new_meta_builder();
772
773 let mut modify_column_types = HashMap::with_capacity(requests.len());
774 let timestamp_index = table_schema.timestamp_index();
775
776 for col_to_change in requests {
777 let change_column_name = &col_to_change.column_name;
778
779 let index = table_schema
780 .column_index_by_name(change_column_name)
781 .with_context(|| error::ColumnNotExistsSnafu {
782 column_name: change_column_name,
783 table_name,
784 })?;
785
786 let column = &table_schema.column_schemas()[index];
787
788 ensure!(
789 !self.primary_key_indices.contains(&index),
790 error::InvalidAlterRequestSnafu {
791 table: table_name,
792 err: format!(
793 "Not allowed to change primary key index column '{}'",
794 column.name
795 )
796 }
797 );
798
799 if let Some(ts_index) = timestamp_index {
800 ensure!(
802 index != ts_index,
803 error::InvalidAlterRequestSnafu {
804 table: table_name,
805 err: format!(
806 "Not allowed to change timestamp index column '{}' datatype",
807 column.name
808 )
809 }
810 );
811 }
812
813 ensure!(
814 modify_column_types
815 .insert(&col_to_change.column_name, col_to_change)
816 .is_none(),
817 error::InvalidAlterRequestSnafu {
818 table: table_name,
819 err: format!(
820 "change column datatype {} more than once",
821 col_to_change.column_name
822 ),
823 }
824 );
825
826 ensure!(
827 column
828 .data_type
829 .can_arrow_type_cast_to(&col_to_change.target_type),
830 error::InvalidAlterRequestSnafu {
831 table: table_name,
832 err: format!(
833 "column '{}' cannot be cast automatically to type '{}'",
834 col_to_change.column_name, col_to_change.target_type,
835 ),
836 }
837 );
838
839 ensure!(
840 column.is_nullable(),
841 error::InvalidAlterRequestSnafu {
842 table: table_name,
843 err: format!(
844 "column '{}' must be nullable to ensure safe conversion.",
845 col_to_change.column_name,
846 ),
847 }
848 );
849 }
850 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
853 for mut column in table_schema.column_schemas().iter().cloned() {
854 if let Some(change_column) = modify_column_types.get(&column.name) {
855 column.data_type = change_column.target_type.clone();
856 let new_default = if let Some(default_value) = column.default_constraint() {
857 Some(
858 default_value
859 .cast_to_datatype(&change_column.target_type)
860 .with_context(|_| error::CastDefaultValueSnafu {
861 reason: format!(
862 "Failed to cast default value from {:?} to type {:?}",
863 default_value, &change_column.target_type
864 ),
865 })?,
866 )
867 } else {
868 None
869 };
870 column = column
871 .clone()
872 .with_default_constraint(new_default.clone())
873 .with_context(|_| error::CastDefaultValueSnafu {
874 reason: format!("Failed to set new default: {:?}", new_default),
875 })?;
876 }
877 columns.push(column)
878 }
879
880 let mut builder = SchemaBuilder::try_from_columns(columns)
881 .with_context(|_| error::SchemaBuildSnafu {
882 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
883 })?
884 .version(table_schema.version() + 1);
886 for (k, v) in table_schema.metadata().iter() {
887 builder = builder.add_metadata(k, v);
888 }
889 let new_schema = builder.build().with_context(|_| {
890 let column_names: Vec<_> = requests
891 .iter()
892 .map(|request| &request.column_name)
893 .collect();
894
895 error::SchemaBuildSnafu {
896 msg: format!(
897 "Table {table_name} cannot change datatype with columns {column_names:?}"
898 ),
899 }
900 })?;
901
902 let _ = meta_builder
903 .schema(Arc::new(new_schema))
904 .primary_key_indices(self.primary_key_indices.clone());
905
906 Ok(meta_builder)
907 }
908
909 fn split_requests_by_column_location<'a>(
911 &self,
912 table_name: &str,
913 requests: &'a [AddColumnRequest],
914 ) -> Result<SplitResult<'a>> {
915 let table_schema = &self.schema;
916 let mut columns_at_first = Vec::new();
917 let mut columns_at_after = HashMap::new();
918 let mut columns_at_last = Vec::new();
919 let mut column_names = Vec::with_capacity(requests.len());
920 for request in requests {
921 let column_name = &request.column_schema.name;
923 column_names.push(column_name.clone());
924 ensure!(
925 table_schema.column_schema_by_name(column_name).is_none(),
926 error::ColumnExistsSnafu {
927 column_name,
928 table_name,
929 }
930 );
931 match request.location.as_ref() {
932 Some(AddColumnLocation::First) => {
933 columns_at_first.push(request);
934 }
935 Some(AddColumnLocation::After { column_name }) => {
936 ensure!(
937 table_schema.column_schema_by_name(column_name).is_some(),
938 error::ColumnNotExistsSnafu {
939 column_name,
940 table_name,
941 }
942 );
943 columns_at_after
944 .entry(column_name.clone())
945 .or_insert(Vec::new())
946 .push(request);
947 }
948 None => {
949 columns_at_last.push(request);
950 }
951 }
952 }
953 Ok(SplitResult {
954 columns_at_first,
955 columns_at_after,
956 columns_at_last,
957 column_names,
958 })
959 }
960
961 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
962 let table_schema = &self.schema;
963 let mut meta_builder = self.new_meta_builder();
964 let mut columns = Vec::with_capacity(table_schema.num_columns());
965 for column_schema in table_schema.column_schemas() {
966 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
967 ensure!(
969 column_schema.default_constraint().is_some(),
970 error::InvalidAlterRequestSnafu {
971 table: table_name,
972 err: format!("column {name} does not have a default value"),
973 }
974 );
975 if !column_schema.is_nullable() {
976 return error::InvalidAlterRequestSnafu {
977 table: table_name,
978 err: format!(
979 "column {name} is not nullable and `default` cannot be dropped",
980 ),
981 }
982 .fail();
983 }
984 let new_column_schema = column_schema.clone();
985 let new_column_schema = new_column_schema
986 .with_default_constraint(None)
987 .with_context(|_| error::SchemaBuildSnafu {
988 msg: format!("Table {table_name} cannot drop default values"),
989 })?;
990 columns.push(new_column_schema);
991 } else {
992 columns.push(column_schema.clone());
993 }
994 }
995
996 let mut builder = SchemaBuilder::try_from_columns(columns)
997 .with_context(|_| error::SchemaBuildSnafu {
998 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
999 })?
1000 .version(table_schema.version() + 1);
1002 for (k, v) in table_schema.metadata().iter() {
1003 builder = builder.add_metadata(k, v);
1004 }
1005 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1006 msg: format!("Table {table_name} cannot drop default values"),
1007 })?;
1008
1009 let _ = meta_builder.schema(Arc::new(new_schema));
1010
1011 Ok(meta_builder)
1012 }
1013
1014 fn set_defaults(
1015 &self,
1016 table_name: &str,
1017 set_defaults: &[SetDefaultRequest],
1018 ) -> Result<TableMetaBuilder> {
1019 let table_schema = &self.schema;
1020 let mut meta_builder = self.new_meta_builder();
1021 let mut columns = Vec::with_capacity(table_schema.num_columns());
1022 for column_schema in table_schema.column_schemas() {
1023 if let Some(set_default) = set_defaults
1024 .iter()
1025 .find(|s| s.column_name == column_schema.name)
1026 {
1027 let new_column_schema = column_schema.clone();
1028 let new_column_schema = new_column_schema
1029 .with_default_constraint(set_default.default_constraint.clone())
1030 .with_context(|_| error::SchemaBuildSnafu {
1031 msg: format!("Table {table_name} cannot set default values"),
1032 })?;
1033 columns.push(new_column_schema);
1034 } else {
1035 columns.push(column_schema.clone());
1036 }
1037 }
1038
1039 let mut builder = SchemaBuilder::try_from_columns(columns)
1040 .with_context(|_| error::SchemaBuildSnafu {
1041 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1042 })?
1043 .version(table_schema.version() + 1);
1045 for (k, v) in table_schema.metadata().iter() {
1046 builder = builder.add_metadata(k, v);
1047 }
1048 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1049 msg: format!("Table {table_name} cannot set default values"),
1050 })?;
1051
1052 let _ = meta_builder.schema(Arc::new(new_schema));
1053
1054 Ok(meta_builder)
1055 }
1056}
1057
1058#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1059#[builder(pattern = "owned")]
1060pub struct TableInfo {
1061 #[builder(default, setter(into))]
1063 pub ident: TableIdent,
1064 #[builder(setter(into))]
1066 pub name: String,
1067 #[builder(default, setter(into))]
1069 pub desc: Option<String>,
1070 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1071 pub catalog_name: String,
1072 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1073 pub schema_name: String,
1074 pub meta: TableMeta,
1075 #[builder(default = "TableType::Base")]
1076 pub table_type: TableType,
1077}
1078
1079pub type TableInfoRef = Arc<TableInfo>;
1080
1081impl TableInfo {
1082 pub fn table_id(&self) -> TableId {
1083 self.ident.table_id
1084 }
1085
1086 pub fn region_ids(&self) -> Vec<RegionId> {
1087 self.meta
1088 .region_numbers
1089 .iter()
1090 .map(|id| RegionId::new(self.table_id(), *id))
1091 .collect()
1092 }
1093 pub fn full_table_name(&self) -> String {
1095 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1096 }
1097
1098 pub fn get_db_string(&self) -> String {
1099 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1100 }
1101
1102 pub fn is_physical_table(&self) -> bool {
1104 self.meta
1105 .options
1106 .extra_options
1107 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1108 }
1109
1110 pub fn is_ttl_instant_table(&self) -> bool {
1112 self.meta
1113 .options
1114 .ttl
1115 .map(|t| t.is_instant())
1116 .unwrap_or(false)
1117 }
1118}
1119
1120impl TableInfoBuilder {
1121 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1122 Self {
1123 name: Some(name.into()),
1124 meta: Some(meta),
1125 ..Default::default()
1126 }
1127 }
1128
1129 pub fn table_id(mut self, id: TableId) -> Self {
1130 let ident = self.ident.get_or_insert_with(TableIdent::default);
1131 ident.table_id = id;
1132 self
1133 }
1134
1135 pub fn table_version(mut self, version: TableVersion) -> Self {
1136 let ident = self.ident.get_or_insert_with(TableIdent::default);
1137 ident.version = version;
1138 self
1139 }
1140}
1141
1142impl TableIdent {
1143 pub fn new(table_id: TableId) -> Self {
1144 Self {
1145 table_id,
1146 version: 0,
1147 }
1148 }
1149}
1150
1151impl From<TableId> for TableIdent {
1152 fn from(table_id: TableId) -> Self {
1153 Self::new(table_id)
1154 }
1155}
1156
1157#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1159pub struct RawTableMeta {
1160 pub schema: RawSchema,
1161 pub primary_key_indices: Vec<usize>,
1164 pub value_indices: Vec<usize>,
1167 pub engine: String,
1169 pub next_column_id: ColumnId,
1172 pub region_numbers: Vec<u32>,
1173 pub options: TableOptions,
1174 pub created_on: DateTime<Utc>,
1175 pub updated_on: DateTime<Utc>,
1176 #[serde(default)]
1178 pub partition_key_indices: Vec<usize>,
1179 #[serde(default)]
1182 pub column_ids: Vec<ColumnId>,
1183}
1184
1185impl<'de> Deserialize<'de> for RawTableMeta {
1186 fn deserialize<D>(
1187 deserializer: D,
1188 ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1189 where
1190 D: Deserializer<'de>,
1191 {
1192 #[derive(Deserialize)]
1193 struct Helper {
1194 schema: RawSchema,
1195 primary_key_indices: Vec<usize>,
1196 value_indices: Vec<usize>,
1197 engine: String,
1198 next_column_id: u32,
1199 region_numbers: Vec<u32>,
1200 options: TableOptions,
1201 created_on: DateTime<Utc>,
1202 updated_on: Option<DateTime<Utc>>,
1203 #[serde(default)]
1204 partition_key_indices: Vec<usize>,
1205 #[serde(default)]
1206 column_ids: Vec<ColumnId>,
1207 }
1208
1209 let h = Helper::deserialize(deserializer)?;
1210 Ok(RawTableMeta {
1211 schema: h.schema,
1212 primary_key_indices: h.primary_key_indices,
1213 value_indices: h.value_indices,
1214 engine: h.engine,
1215 next_column_id: h.next_column_id,
1216 region_numbers: h.region_numbers,
1217 options: h.options,
1218 created_on: h.created_on,
1219 updated_on: h.updated_on.unwrap_or(h.created_on),
1220 partition_key_indices: h.partition_key_indices,
1221 column_ids: h.column_ids,
1222 })
1223 }
1224}
1225
1226impl From<TableMeta> for RawTableMeta {
1227 fn from(meta: TableMeta) -> RawTableMeta {
1228 RawTableMeta {
1229 schema: RawSchema::from(&*meta.schema),
1230 primary_key_indices: meta.primary_key_indices,
1231 value_indices: meta.value_indices,
1232 engine: meta.engine,
1233 next_column_id: meta.next_column_id,
1234 region_numbers: meta.region_numbers,
1235 options: meta.options,
1236 created_on: meta.created_on,
1237 updated_on: meta.updated_on,
1238 partition_key_indices: meta.partition_key_indices,
1239 column_ids: meta.column_ids,
1240 }
1241 }
1242}
1243
1244impl TryFrom<RawTableMeta> for TableMeta {
1245 type Error = ConvertError;
1246
1247 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1248 Ok(TableMeta {
1249 schema: Arc::new(Schema::try_from(raw.schema)?),
1250 primary_key_indices: raw.primary_key_indices,
1251 value_indices: raw.value_indices,
1252 engine: raw.engine,
1253 region_numbers: raw.region_numbers,
1254 next_column_id: raw.next_column_id,
1255 options: raw.options,
1256 created_on: raw.created_on,
1257 updated_on: raw.updated_on,
1258 partition_key_indices: raw.partition_key_indices,
1259 column_ids: raw.column_ids,
1260 })
1261 }
1262}
1263
1264#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1266pub struct RawTableInfo {
1267 pub ident: TableIdent,
1268 pub name: String,
1269 pub desc: Option<String>,
1270 pub catalog_name: String,
1271 pub schema_name: String,
1272 pub meta: RawTableMeta,
1273 pub table_type: TableType,
1274}
1275
1276impl RawTableInfo {
1277 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1281 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1282 None
1283 } else {
1284 Some(
1285 self.meta
1286 .column_ids
1287 .iter()
1288 .enumerate()
1289 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1290 .collect(),
1291 )
1292 }
1293 }
1294
1295 pub fn sort_columns(&mut self) {
1297 let column_schemas = &self.meta.schema.column_schemas;
1298 let primary_keys = self
1299 .meta
1300 .primary_key_indices
1301 .iter()
1302 .map(|index| column_schemas[*index].name.clone())
1303 .collect::<HashSet<_>>();
1304
1305 let name_to_ids = self.name_to_ids().unwrap_or_default();
1306 self.meta
1307 .schema
1308 .column_schemas
1309 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1310
1311 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1313 let mut timestamp_index = None;
1314 let mut value_indices =
1315 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1316 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1317 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1318 if primary_keys.contains(&column_schema.name) {
1319 primary_key_indices.push(index);
1320 } else if column_schema.is_time_index() {
1321 value_indices.push(index);
1322 timestamp_index = Some(index);
1323 } else {
1324 value_indices.push(index);
1325 }
1326 if let Some(id) = name_to_ids.get(&column_schema.name) {
1327 column_ids.push(*id);
1328 }
1329 }
1330
1331 self.meta.schema.timestamp_index = timestamp_index;
1333 self.meta.primary_key_indices = primary_key_indices;
1334 self.meta.value_indices = value_indices;
1335 self.meta.column_ids = column_ids;
1336 }
1337
1338 pub fn to_region_options(&self) -> HashMap<String, String> {
1342 HashMap::from(&self.meta.options)
1343 }
1344
1345 pub fn table_ref(&self) -> TableReference<'_> {
1347 TableReference::full(
1348 self.catalog_name.as_str(),
1349 self.schema_name.as_str(),
1350 self.name.as_str(),
1351 )
1352 }
1353}
1354
1355impl From<TableInfo> for RawTableInfo {
1356 fn from(info: TableInfo) -> RawTableInfo {
1357 RawTableInfo {
1358 ident: info.ident,
1359 name: info.name,
1360 desc: info.desc,
1361 catalog_name: info.catalog_name,
1362 schema_name: info.schema_name,
1363 meta: RawTableMeta::from(info.meta),
1364 table_type: info.table_type,
1365 }
1366 }
1367}
1368
1369impl TryFrom<RawTableInfo> for TableInfo {
1370 type Error = ConvertError;
1371
1372 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1373 Ok(TableInfo {
1374 ident: raw.ident,
1375 name: raw.name,
1376 desc: raw.desc,
1377 catalog_name: raw.catalog_name,
1378 schema_name: raw.schema_name,
1379 meta: TableMeta::try_from(raw.meta)?,
1380 table_type: raw.table_type,
1381 })
1382 }
1383}
1384
1385fn set_column_fulltext_options(
1394 column_schema: &mut ColumnSchema,
1395 column_name: &str,
1396 options: &FulltextOptions,
1397 current_options: Option<FulltextOptions>,
1398) -> Result<()> {
1399 if let Some(current_options) = current_options {
1400 ensure!(
1401 current_options.analyzer == options.analyzer
1402 && current_options.case_sensitive == options.case_sensitive,
1403 error::InvalidColumnOptionSnafu {
1404 column_name,
1405 msg: format!(
1406 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1407 current_options.analyzer, current_options.case_sensitive
1408 ),
1409 }
1410 );
1411 }
1412
1413 column_schema
1414 .set_fulltext_options(options)
1415 .context(error::SetFulltextOptionsSnafu { column_name })?;
1416
1417 Ok(())
1418}
1419
1420fn unset_column_fulltext_options(
1421 column_schema: &mut ColumnSchema,
1422 column_name: &str,
1423 current_options: Option<FulltextOptions>,
1424) -> Result<()> {
1425 ensure!(
1426 current_options
1427 .as_ref()
1428 .is_some_and(|options| options.enable),
1429 error::InvalidColumnOptionSnafu {
1430 column_name,
1431 msg: "FULLTEXT index already disabled".to_string(),
1432 }
1433 );
1434
1435 let mut options = current_options.unwrap();
1436 options.enable = false;
1437 column_schema
1438 .set_fulltext_options(&options)
1439 .context(error::SetFulltextOptionsSnafu { column_name })?;
1440
1441 Ok(())
1442}
1443
1444fn set_column_skipping_index_options(
1445 column_schema: &mut ColumnSchema,
1446 column_name: &str,
1447 options: &SkippingIndexOptions,
1448) -> Result<()> {
1449 column_schema
1450 .set_skipping_options(options)
1451 .context(error::SetSkippingOptionsSnafu { column_name })?;
1452
1453 Ok(())
1454}
1455
1456fn unset_column_skipping_index_options(
1457 column_schema: &mut ColumnSchema,
1458 column_name: &str,
1459) -> Result<()> {
1460 column_schema
1461 .unset_skipping_options()
1462 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1463 Ok(())
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468 use std::assert_matches::assert_matches;
1469
1470 use common_error::ext::ErrorExt;
1471 use common_error::status_code::StatusCode;
1472 use datatypes::data_type::ConcreteDataType;
1473 use datatypes::schema::{
1474 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1475 };
1476
1477 use super::*;
1478 use crate::Error;
1479
1480 fn new_test_schema() -> Schema {
1482 let column_schemas = vec![
1483 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1484 ColumnSchema::new(
1485 "ts",
1486 ConcreteDataType::timestamp_millisecond_datatype(),
1487 false,
1488 )
1489 .with_time_index(true),
1490 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1491 ];
1492 SchemaBuilder::try_from(column_schemas)
1493 .unwrap()
1494 .version(123)
1495 .build()
1496 .unwrap()
1497 }
1498
1499 #[test]
1500 fn test_raw_convert() {
1501 let schema = Arc::new(new_test_schema());
1502 let meta = TableMetaBuilder::empty()
1503 .schema(schema)
1504 .primary_key_indices(vec![0])
1505 .engine("engine")
1506 .next_column_id(3)
1507 .build()
1508 .unwrap();
1509 let info = TableInfoBuilder::default()
1510 .table_id(10)
1511 .table_version(5)
1512 .name("mytable")
1513 .meta(meta)
1514 .build()
1515 .unwrap();
1516
1517 let raw = RawTableInfo::from(info.clone());
1518 let info_new = TableInfo::try_from(raw).unwrap();
1519
1520 assert_eq!(info, info_new);
1521 }
1522
1523 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1524 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1525 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1526 let alter_kind = AlterKind::AddColumns {
1527 columns: vec![
1528 AddColumnRequest {
1529 column_schema: new_tag,
1530 is_key: true,
1531 location: None,
1532 add_if_not_exists: false,
1533 },
1534 AddColumnRequest {
1535 column_schema: new_field,
1536 is_key: false,
1537 location: None,
1538 add_if_not_exists: false,
1539 },
1540 ],
1541 };
1542
1543 let builder = meta
1544 .builder_with_alter_kind("my_table", &alter_kind)
1545 .unwrap();
1546 builder.build().unwrap()
1547 }
1548
1549 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1550 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1551 let new_field = ColumnSchema::new(
1552 "my_field_after_ts",
1553 ConcreteDataType::string_datatype(),
1554 true,
1555 );
1556 let yet_another_field = ColumnSchema::new(
1557 "yet_another_field_after_ts",
1558 ConcreteDataType::int64_datatype(),
1559 true,
1560 );
1561 let alter_kind = AlterKind::AddColumns {
1562 columns: vec![
1563 AddColumnRequest {
1564 column_schema: new_tag,
1565 is_key: true,
1566 location: Some(AddColumnLocation::First),
1567 add_if_not_exists: false,
1568 },
1569 AddColumnRequest {
1570 column_schema: new_field,
1571 is_key: false,
1572 location: Some(AddColumnLocation::After {
1573 column_name: "ts".to_string(),
1574 }),
1575 add_if_not_exists: false,
1576 },
1577 AddColumnRequest {
1578 column_schema: yet_another_field,
1579 is_key: true,
1580 location: Some(AddColumnLocation::After {
1581 column_name: "ts".to_string(),
1582 }),
1583 add_if_not_exists: false,
1584 },
1585 ],
1586 };
1587
1588 let builder = meta
1589 .builder_with_alter_kind("my_table", &alter_kind)
1590 .unwrap();
1591 builder.build().unwrap()
1592 }
1593
1594 #[test]
1595 fn test_add_columns() {
1596 let schema = Arc::new(new_test_schema());
1597 let meta = TableMetaBuilder::empty()
1598 .schema(schema)
1599 .primary_key_indices(vec![0])
1600 .engine("engine")
1601 .next_column_id(3)
1602 .build()
1603 .unwrap();
1604
1605 let new_meta = add_columns_to_meta(&meta);
1606 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1607
1608 let names: Vec<String> = new_meta
1609 .schema
1610 .column_schemas()
1611 .iter()
1612 .map(|column_schema| column_schema.name.clone())
1613 .collect();
1614 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1615 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1616 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1617 }
1618
1619 #[test]
1620 fn test_add_columns_multiple_times() {
1621 let schema = Arc::new(new_test_schema());
1622 let meta = TableMetaBuilder::empty()
1623 .schema(schema)
1624 .primary_key_indices(vec![0])
1625 .engine("engine")
1626 .next_column_id(3)
1627 .build()
1628 .unwrap();
1629
1630 let alter_kind = AlterKind::AddColumns {
1631 columns: vec![
1632 AddColumnRequest {
1633 column_schema: ColumnSchema::new(
1634 "col3",
1635 ConcreteDataType::int32_datatype(),
1636 true,
1637 ),
1638 is_key: true,
1639 location: None,
1640 add_if_not_exists: true,
1641 },
1642 AddColumnRequest {
1643 column_schema: ColumnSchema::new(
1644 "col3",
1645 ConcreteDataType::int32_datatype(),
1646 true,
1647 ),
1648 is_key: true,
1649 location: None,
1650 add_if_not_exists: true,
1651 },
1652 ],
1653 };
1654 let err = meta
1655 .builder_with_alter_kind("my_table", &alter_kind)
1656 .err()
1657 .unwrap();
1658 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1659 }
1660
1661 #[test]
1662 fn test_remove_columns() {
1663 let schema = Arc::new(new_test_schema());
1664 let meta = TableMetaBuilder::empty()
1665 .schema(schema.clone())
1666 .primary_key_indices(vec![0])
1667 .engine("engine")
1668 .next_column_id(3)
1669 .build()
1670 .unwrap();
1671 let meta = add_columns_to_meta(&meta);
1673
1674 let alter_kind = AlterKind::DropColumns {
1675 names: vec![String::from("col2"), String::from("my_field")],
1676 };
1677 let new_meta = meta
1678 .builder_with_alter_kind("my_table", &alter_kind)
1679 .unwrap()
1680 .build()
1681 .unwrap();
1682
1683 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1684
1685 let names: Vec<String> = new_meta
1686 .schema
1687 .column_schemas()
1688 .iter()
1689 .map(|column_schema| column_schema.name.clone())
1690 .collect();
1691 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1692 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1693 assert_eq!(&[1], &new_meta.value_indices[..]);
1694 assert_eq!(
1695 schema.timestamp_column(),
1696 new_meta.schema.timestamp_column()
1697 );
1698 }
1699
1700 #[test]
1701 fn test_remove_multiple_columns_before_timestamp() {
1702 let column_schemas = vec![
1703 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1704 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1705 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1706 ColumnSchema::new(
1707 "ts",
1708 ConcreteDataType::timestamp_millisecond_datatype(),
1709 false,
1710 )
1711 .with_time_index(true),
1712 ];
1713 let schema = Arc::new(
1714 SchemaBuilder::try_from(column_schemas)
1715 .unwrap()
1716 .version(123)
1717 .build()
1718 .unwrap(),
1719 );
1720 let meta = TableMetaBuilder::empty()
1721 .schema(schema.clone())
1722 .primary_key_indices(vec![1])
1723 .engine("engine")
1724 .next_column_id(4)
1725 .build()
1726 .unwrap();
1727
1728 let alter_kind = AlterKind::DropColumns {
1730 names: vec![String::from("col3"), String::from("col1")],
1731 };
1732 let new_meta = meta
1733 .builder_with_alter_kind("my_table", &alter_kind)
1734 .unwrap()
1735 .build()
1736 .unwrap();
1737
1738 let names: Vec<String> = new_meta
1739 .schema
1740 .column_schemas()
1741 .iter()
1742 .map(|column_schema| column_schema.name.clone())
1743 .collect();
1744 assert_eq!(&["col2", "ts"], &names[..]);
1745 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1746 assert_eq!(&[1], &new_meta.value_indices[..]);
1747 assert_eq!(
1748 schema.timestamp_column(),
1749 new_meta.schema.timestamp_column()
1750 );
1751 }
1752
1753 #[test]
1754 fn test_add_existing_column() {
1755 let schema = Arc::new(new_test_schema());
1756 let meta = TableMetaBuilder::empty()
1757 .schema(schema)
1758 .primary_key_indices(vec![0])
1759 .engine("engine")
1760 .next_column_id(3)
1761 .build()
1762 .unwrap();
1763
1764 let alter_kind = AlterKind::AddColumns {
1765 columns: vec![AddColumnRequest {
1766 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1767 is_key: false,
1768 location: None,
1769 add_if_not_exists: false,
1770 }],
1771 };
1772
1773 let err = meta
1774 .builder_with_alter_kind("my_table", &alter_kind)
1775 .err()
1776 .unwrap();
1777 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1778
1779 let alter_kind = AlterKind::AddColumns {
1781 columns: vec![AddColumnRequest {
1782 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1783 is_key: true,
1784 location: None,
1785 add_if_not_exists: true,
1786 }],
1787 };
1788 let new_meta = meta
1789 .builder_with_alter_kind("my_table", &alter_kind)
1790 .unwrap()
1791 .build()
1792 .unwrap();
1793 assert_eq!(
1794 meta.schema.column_schemas(),
1795 new_meta.schema.column_schemas()
1796 );
1797 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1798 }
1799
1800 #[test]
1801 fn test_add_different_type_column() {
1802 let schema = Arc::new(new_test_schema());
1803 let meta = TableMetaBuilder::empty()
1804 .schema(schema)
1805 .primary_key_indices(vec![0])
1806 .engine("engine")
1807 .next_column_id(3)
1808 .build()
1809 .unwrap();
1810
1811 let alter_kind = AlterKind::AddColumns {
1813 columns: vec![AddColumnRequest {
1814 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1815 is_key: false,
1816 location: None,
1817 add_if_not_exists: true,
1818 }],
1819 };
1820 let err = meta
1821 .builder_with_alter_kind("my_table", &alter_kind)
1822 .err()
1823 .unwrap();
1824 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1825 }
1826
1827 #[test]
1828 fn test_add_invalid_column() {
1829 let schema = Arc::new(new_test_schema());
1830 let meta = TableMetaBuilder::empty()
1831 .schema(schema)
1832 .primary_key_indices(vec![0])
1833 .engine("engine")
1834 .next_column_id(3)
1835 .build()
1836 .unwrap();
1837
1838 let alter_kind = AlterKind::AddColumns {
1840 columns: vec![AddColumnRequest {
1841 column_schema: ColumnSchema::new(
1842 "weny",
1843 ConcreteDataType::string_datatype(),
1844 false,
1845 ),
1846 is_key: false,
1847 location: None,
1848 add_if_not_exists: false,
1849 }],
1850 };
1851
1852 let err = meta
1853 .builder_with_alter_kind("my_table", &alter_kind)
1854 .err()
1855 .unwrap();
1856 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1857 }
1858
1859 #[test]
1860 fn test_remove_unknown_column() {
1861 let schema = Arc::new(new_test_schema());
1862 let meta = TableMetaBuilder::empty()
1863 .schema(schema)
1864 .primary_key_indices(vec![0])
1865 .engine("engine")
1866 .next_column_id(3)
1867 .build()
1868 .unwrap();
1869
1870 let alter_kind = AlterKind::DropColumns {
1871 names: vec![String::from("unknown")],
1872 };
1873
1874 let err = meta
1875 .builder_with_alter_kind("my_table", &alter_kind)
1876 .err()
1877 .unwrap();
1878 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1879 }
1880
1881 #[test]
1882 fn test_change_unknown_column_data_type() {
1883 let schema = Arc::new(new_test_schema());
1884 let meta = TableMetaBuilder::empty()
1885 .schema(schema)
1886 .primary_key_indices(vec![0])
1887 .engine("engine")
1888 .next_column_id(3)
1889 .build()
1890 .unwrap();
1891
1892 let alter_kind = AlterKind::ModifyColumnTypes {
1893 columns: vec![ModifyColumnTypeRequest {
1894 column_name: "unknown".to_string(),
1895 target_type: ConcreteDataType::string_datatype(),
1896 }],
1897 };
1898
1899 let err = meta
1900 .builder_with_alter_kind("my_table", &alter_kind)
1901 .err()
1902 .unwrap();
1903 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1904 }
1905
1906 #[test]
1907 fn test_remove_key_column() {
1908 let schema = Arc::new(new_test_schema());
1909 let meta = TableMetaBuilder::empty()
1910 .schema(schema)
1911 .primary_key_indices(vec![0])
1912 .engine("engine")
1913 .next_column_id(3)
1914 .build()
1915 .unwrap();
1916
1917 let alter_kind = AlterKind::DropColumns {
1919 names: vec![String::from("col1")],
1920 };
1921
1922 let err = meta
1923 .builder_with_alter_kind("my_table", &alter_kind)
1924 .err()
1925 .unwrap();
1926 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1927
1928 let alter_kind = AlterKind::DropColumns {
1930 names: vec![String::from("ts")],
1931 };
1932
1933 let err = meta
1934 .builder_with_alter_kind("my_table", &alter_kind)
1935 .err()
1936 .unwrap();
1937 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1938 }
1939
1940 #[test]
1941 fn test_remove_partition_column() {
1942 let schema = Arc::new(new_test_schema());
1943 let meta = TableMetaBuilder::empty()
1944 .schema(schema)
1945 .primary_key_indices(vec![])
1946 .partition_key_indices(vec![0])
1947 .engine("engine")
1948 .next_column_id(3)
1949 .build()
1950 .unwrap();
1951 let alter_kind = AlterKind::DropColumns {
1953 names: vec![String::from("col1")],
1954 };
1955
1956 let err = meta
1957 .builder_with_alter_kind("my_table", &alter_kind)
1958 .err()
1959 .unwrap();
1960 assert_matches!(err, Error::RemovePartitionColumn { .. });
1961 }
1962
1963 #[test]
1964 fn test_change_key_column_data_type() {
1965 let schema = Arc::new(new_test_schema());
1966 let meta = TableMetaBuilder::empty()
1967 .schema(schema)
1968 .primary_key_indices(vec![0])
1969 .engine("engine")
1970 .next_column_id(3)
1971 .build()
1972 .unwrap();
1973
1974 let alter_kind = AlterKind::ModifyColumnTypes {
1976 columns: vec![ModifyColumnTypeRequest {
1977 column_name: "col1".to_string(),
1978 target_type: ConcreteDataType::string_datatype(),
1979 }],
1980 };
1981
1982 let err = meta
1983 .builder_with_alter_kind("my_table", &alter_kind)
1984 .err()
1985 .unwrap();
1986 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1987
1988 let alter_kind = AlterKind::ModifyColumnTypes {
1990 columns: vec![ModifyColumnTypeRequest {
1991 column_name: "ts".to_string(),
1992 target_type: ConcreteDataType::string_datatype(),
1993 }],
1994 };
1995
1996 let err = meta
1997 .builder_with_alter_kind("my_table", &alter_kind)
1998 .err()
1999 .unwrap();
2000 assert_eq!(StatusCode::InvalidArguments, err.status_code());
2001 }
2002
2003 #[test]
2004 fn test_alloc_new_column() {
2005 let schema = Arc::new(new_test_schema());
2006 let mut meta = TableMetaBuilder::empty()
2007 .schema(schema)
2008 .primary_key_indices(vec![0])
2009 .engine("engine")
2010 .next_column_id(3)
2011 .build()
2012 .unwrap();
2013 assert_eq!(3, meta.next_column_id);
2014
2015 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2016 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2017
2018 assert_eq!(4, meta.next_column_id);
2019 assert_eq!(column_schema.name, desc.name);
2020 }
2021
2022 #[test]
2023 fn test_add_columns_with_location() {
2024 let schema = Arc::new(new_test_schema());
2025 let meta = TableMetaBuilder::empty()
2026 .schema(schema)
2027 .primary_key_indices(vec![0])
2028 .partition_key_indices(vec![0, 2])
2030 .engine("engine")
2031 .next_column_id(3)
2032 .build()
2033 .unwrap();
2034
2035 let new_meta = add_columns_to_meta_with_location(&meta);
2036 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2037
2038 let names: Vec<String> = new_meta
2039 .schema
2040 .column_schemas()
2041 .iter()
2042 .map(|column_schema| column_schema.name.clone())
2043 .collect();
2044 assert_eq!(
2045 &[
2046 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
2053 &names[..]
2054 );
2055 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2056 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2057 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2058 }
2059
2060 #[test]
2061 fn test_modify_column_fulltext_options() {
2062 let schema = Arc::new(new_test_schema());
2063 let meta = TableMetaBuilder::empty()
2064 .schema(schema)
2065 .primary_key_indices(vec![0])
2066 .engine("engine")
2067 .next_column_id(3)
2068 .build()
2069 .unwrap();
2070
2071 let alter_kind = AlterKind::SetIndexes {
2072 options: vec![SetIndexOption::Fulltext {
2073 column_name: "col1".to_string(),
2074 options: FulltextOptions::default(),
2075 }],
2076 };
2077 let err = meta
2078 .builder_with_alter_kind("my_table", &alter_kind)
2079 .err()
2080 .unwrap();
2081 assert_eq!(
2082 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2083 err.to_string()
2084 );
2085
2086 let new_meta = add_columns_to_meta_with_location(&meta);
2088 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2089
2090 let alter_kind = AlterKind::SetIndexes {
2091 options: vec![SetIndexOption::Fulltext {
2092 column_name: "my_tag_first".to_string(),
2093 options: FulltextOptions::new_unchecked(
2094 true,
2095 FulltextAnalyzer::Chinese,
2096 true,
2097 FulltextBackend::Bloom,
2098 1000,
2099 0.01,
2100 ),
2101 }],
2102 };
2103 let new_meta = new_meta
2104 .builder_with_alter_kind("my_table", &alter_kind)
2105 .unwrap()
2106 .build()
2107 .unwrap();
2108 let column_schema = new_meta
2109 .schema
2110 .column_schema_by_name("my_tag_first")
2111 .unwrap();
2112 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2113 assert!(fulltext_options.enable);
2114 assert_eq!(
2115 datatypes::schema::FulltextAnalyzer::Chinese,
2116 fulltext_options.analyzer
2117 );
2118 assert!(fulltext_options.case_sensitive);
2119
2120 let alter_kind = AlterKind::UnsetIndexes {
2121 options: vec![UnsetIndexOption::Fulltext {
2122 column_name: "my_tag_first".to_string(),
2123 }],
2124 };
2125 let new_meta = new_meta
2126 .builder_with_alter_kind("my_table", &alter_kind)
2127 .unwrap()
2128 .build()
2129 .unwrap();
2130 let column_schema = new_meta
2131 .schema
2132 .column_schema_by_name("my_tag_first")
2133 .unwrap();
2134 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2135 assert!(!fulltext_options.enable);
2136 }
2137}