1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
106pub struct TableIdent {
107 pub table_id: TableId,
109 pub version: TableVersion,
112}
113
114#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
118#[builder(pattern = "mutable", custom_constructor)]
119pub struct TableMeta {
120 pub schema: SchemaRef,
121 pub primary_key_indices: Vec<usize>,
124 #[builder(default = "self.default_value_indices()?")]
125 pub value_indices: Vec<usize>,
126 #[builder(default, setter(into))]
127 pub engine: String,
128 #[builder(default, setter(into))]
129 pub region_numbers: Vec<u32>,
130 pub next_column_id: ColumnId,
131 #[builder(default)]
133 pub options: TableOptions,
134 #[builder(default = "Utc::now()")]
135 pub created_on: DateTime<Utc>,
136 #[builder(default = "Vec::new()")]
137 pub partition_key_indices: Vec<usize>,
138 #[builder(default = "Vec::new()")]
139 pub column_ids: Vec<ColumnId>,
140}
141
142impl TableMetaBuilder {
143 #[cfg(any(test, feature = "testing"))]
145 pub fn empty() -> Self {
146 Self {
147 schema: None,
148 primary_key_indices: None,
149 value_indices: None,
150 engine: None,
151 region_numbers: None,
152 next_column_id: None,
153 options: None,
154 created_on: None,
155 partition_key_indices: None,
156 column_ids: None,
157 }
158 }
159}
160
161impl TableMetaBuilder {
162 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
163 match (&self.primary_key_indices, &self.schema) {
164 (Some(v), Some(schema)) => {
165 let column_schemas = schema.column_schemas();
166 Ok((0..column_schemas.len())
167 .filter(|idx| !v.contains(idx))
168 .collect())
169 }
170 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
171 }
172 }
173
174 pub fn new_external_table() -> Self {
175 Self {
176 schema: None,
177 primary_key_indices: Some(Vec::new()),
178 value_indices: Some(Vec::new()),
179 engine: None,
180 region_numbers: Some(Vec::new()),
181 next_column_id: Some(0),
182 options: None,
183 created_on: None,
184 partition_key_indices: None,
185 column_ids: None,
186 }
187 }
188}
189
190struct SplitResult<'a> {
192 columns_at_first: Vec<&'a AddColumnRequest>,
194 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
196 columns_at_last: Vec<&'a AddColumnRequest>,
198 column_names: Vec<String>,
200}
201
202impl TableMeta {
203 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
204 let columns_schemas = &self.schema.column_schemas();
205 self.primary_key_indices
206 .iter()
207 .map(|idx| &columns_schemas[*idx].name)
208 }
209
210 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
211 let columns_schemas = self.schema.column_schemas();
213 let primary_key_indices = &self.primary_key_indices;
214 columns_schemas
215 .iter()
216 .enumerate()
217 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
218 .map(|(_, cs)| &cs.name)
219 }
220
221 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
222 let columns_schemas = &self.schema.column_schemas();
223 self.partition_key_indices
224 .iter()
225 .map(|idx| &columns_schemas[*idx].name)
226 }
227
228 pub fn builder_with_alter_kind(
232 &self,
233 table_name: &str,
234 alter_kind: &AlterKind,
235 ) -> Result<TableMetaBuilder> {
236 match alter_kind {
237 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
238 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
239 AlterKind::ModifyColumnTypes { columns } => {
240 self.modify_column_types(table_name, columns)
241 }
242 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
244 AlterKind::SetTableOptions { options } => self.set_table_options(options),
245 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
246 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
247 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
248 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
249 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
250 }
251 }
252
253 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
255 let mut new_options = self.options.clone();
256
257 for request in requests {
258 match request {
259 SetRegionOption::Ttl(new_ttl) => {
260 new_options.ttl = *new_ttl;
261 }
262 SetRegionOption::Twsc(key, value) => {
263 if !value.is_empty() {
264 new_options
265 .extra_options
266 .insert(key.to_string(), value.to_string());
267 new_options.extra_options.insert(
269 COMPACTION_TYPE.to_string(),
270 COMPACTION_TYPE_TWCS.to_string(),
271 );
272 } else {
273 new_options.extra_options.remove(key.as_str());
275 }
276 }
277 }
278 }
279 let mut builder = self.new_meta_builder();
280 builder.options(new_options);
281
282 Ok(builder)
283 }
284
285 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
286 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
287 self.set_table_options(&requests)
288 }
289
290 fn set_indexes(
291 &self,
292 table_name: &str,
293 requests: &[SetIndexOption],
294 ) -> Result<TableMetaBuilder> {
295 let table_schema = &self.schema;
296 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
297 for request in requests {
298 let column_name = request.column_name();
299 table_schema
300 .column_index_by_name(column_name)
301 .with_context(|| error::ColumnNotExistsSnafu {
302 column_name,
303 table_name,
304 })?;
305 set_index_options
306 .entry(column_name)
307 .or_default()
308 .push(request);
309 }
310
311 let mut meta_builder = self.new_meta_builder();
312 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
313 for mut column in table_schema.column_schemas().iter().cloned() {
314 if let Some(request) = set_index_options.get(column.name.as_str()) {
315 for request in request {
316 self.set_index(&mut column, request)?;
317 }
318 }
319 columns.push(column);
320 }
321
322 let mut builder = SchemaBuilder::try_from_columns(columns)
323 .with_context(|_| error::SchemaBuildSnafu {
324 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
325 })?
326 .version(table_schema.version() + 1);
327
328 for (k, v) in table_schema.metadata().iter() {
329 builder = builder.add_metadata(k, v);
330 }
331
332 let new_schema = builder.build().with_context(|_| {
333 let column_names = requests
334 .iter()
335 .map(|request| request.column_name())
336 .collect::<Vec<_>>();
337 error::SchemaBuildSnafu {
338 msg: format!(
339 "Table {table_name} cannot set index options with columns {column_names:?}",
340 ),
341 }
342 })?;
343 let _ = meta_builder
344 .schema(Arc::new(new_schema))
345 .primary_key_indices(self.primary_key_indices.clone());
346
347 Ok(meta_builder)
348 }
349
350 fn unset_indexes(
351 &self,
352 table_name: &str,
353 requests: &[UnsetIndexOption],
354 ) -> Result<TableMetaBuilder> {
355 let table_schema = &self.schema;
356 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
357 for request in requests {
358 let column_name = request.column_name();
359 table_schema
360 .column_index_by_name(column_name)
361 .with_context(|| error::ColumnNotExistsSnafu {
362 column_name,
363 table_name,
364 })?;
365 set_index_options
366 .entry(column_name)
367 .or_default()
368 .push(request);
369 }
370
371 let mut meta_builder = self.new_meta_builder();
372 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
373 for mut column in table_schema.column_schemas().iter().cloned() {
374 if let Some(request) = set_index_options.get(column.name.as_str()) {
375 for request in request {
376 self.unset_index(&mut column, request)?;
377 }
378 }
379 columns.push(column);
380 }
381
382 let mut builder = SchemaBuilder::try_from_columns(columns)
383 .with_context(|_| error::SchemaBuildSnafu {
384 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
385 })?
386 .version(table_schema.version() + 1);
387
388 for (k, v) in table_schema.metadata().iter() {
389 builder = builder.add_metadata(k, v);
390 }
391
392 let new_schema = builder.build().with_context(|_| {
393 let column_names = requests
394 .iter()
395 .map(|request| request.column_name())
396 .collect::<Vec<_>>();
397 error::SchemaBuildSnafu {
398 msg: format!(
399 "Table {table_name} cannot set index options with columns {column_names:?}",
400 ),
401 }
402 })?;
403 let _ = meta_builder
404 .schema(Arc::new(new_schema))
405 .primary_key_indices(self.primary_key_indices.clone());
406
407 Ok(meta_builder)
408 }
409
410 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
411 match request {
412 SetIndexOption::Fulltext {
413 column_name,
414 options,
415 } => {
416 ensure!(
417 column_schema.data_type.is_string(),
418 error::InvalidColumnOptionSnafu {
419 column_name,
420 msg: "FULLTEXT index only supports string type",
421 }
422 );
423
424 let current_fulltext_options = column_schema
425 .fulltext_options()
426 .context(error::SetFulltextOptionsSnafu { column_name })?;
427 set_column_fulltext_options(
428 column_schema,
429 column_name,
430 options,
431 current_fulltext_options,
432 )?;
433 }
434 SetIndexOption::Inverted { column_name } => {
435 debug_assert_eq!(column_schema.name, *column_name);
436 column_schema.set_inverted_index(true);
437 }
438 SetIndexOption::Skipping {
439 column_name,
440 options,
441 } => {
442 set_column_skipping_index_options(column_schema, column_name, options)?;
443 }
444 }
445
446 Ok(())
447 }
448
449 fn unset_index(
450 &self,
451 column_schema: &mut ColumnSchema,
452 request: &UnsetIndexOption,
453 ) -> Result<()> {
454 match request {
455 UnsetIndexOption::Fulltext { column_name } => {
456 let current_fulltext_options = column_schema
457 .fulltext_options()
458 .context(error::SetFulltextOptionsSnafu { column_name })?;
459 unset_column_fulltext_options(
460 column_schema,
461 column_name,
462 current_fulltext_options.clone(),
463 )?
464 }
465 UnsetIndexOption::Inverted { .. } => {
466 column_schema.set_inverted_index(false);
467 }
468 UnsetIndexOption::Skipping { column_name } => {
469 unset_column_skipping_index_options(column_schema, column_name)?;
470 }
471 }
472
473 Ok(())
474 }
475
476 pub fn alloc_new_column(
481 &mut self,
482 table_name: &str,
483 new_column: &ColumnSchema,
484 ) -> Result<ColumnDescriptor> {
485 let desc = ColumnDescriptorBuilder::new(
486 self.next_column_id as ColumnId,
487 &new_column.name,
488 new_column.data_type.clone(),
489 )
490 .is_nullable(new_column.is_nullable())
491 .default_constraint(new_column.default_constraint().cloned())
492 .build()
493 .context(error::BuildColumnDescriptorSnafu {
494 table_name,
495 column_name: &new_column.name,
496 })?;
497
498 self.next_column_id += 1;
500
501 Ok(desc)
502 }
503
504 fn new_meta_builder(&self) -> TableMetaBuilder {
506 let mut builder = TableMetaBuilder::from(self);
507 builder.value_indices = None;
509 builder
510 }
511
512 fn add_columns(
514 &self,
515 table_name: &str,
516 requests: &[AddColumnRequest],
517 ) -> Result<TableMetaBuilder> {
518 let table_schema = &self.schema;
519 let mut meta_builder = self.new_meta_builder();
520 let original_primary_key_indices: HashSet<&usize> =
521 self.primary_key_indices.iter().collect();
522
523 let mut names = HashSet::with_capacity(requests.len());
524 let mut new_columns = Vec::with_capacity(requests.len());
525 for col_to_add in requests {
526 if let Some(column_schema) =
527 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
528 {
529 ensure!(
531 col_to_add.add_if_not_exists,
532 error::ColumnExistsSnafu {
533 table_name,
534 column_name: &col_to_add.column_schema.name
535 },
536 );
537
538 ensure!(
540 column_schema.data_type == col_to_add.column_schema.data_type,
541 error::InvalidAlterRequestSnafu {
542 table: table_name,
543 err: format!(
544 "column {} already exists with different type {:?}",
545 col_to_add.column_schema.name, column_schema.data_type,
546 ),
547 }
548 );
549 } else {
550 ensure!(
553 names.insert(&col_to_add.column_schema.name),
554 error::InvalidAlterRequestSnafu {
555 table: table_name,
556 err: format!(
557 "add column {} more than once",
558 col_to_add.column_schema.name
559 ),
560 }
561 );
562
563 ensure!(
564 col_to_add.column_schema.is_nullable()
565 || col_to_add.column_schema.default_constraint().is_some(),
566 error::InvalidAlterRequestSnafu {
567 table: table_name,
568 err: format!(
569 "no default value for column {}",
570 col_to_add.column_schema.name
571 ),
572 },
573 );
574
575 new_columns.push(col_to_add.clone());
576 }
577 }
578 let requests = &new_columns[..];
579
580 let SplitResult {
581 columns_at_first,
582 columns_at_after,
583 columns_at_last,
584 column_names,
585 } = self.split_requests_by_column_location(table_name, requests)?;
586 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
587 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
588 columns_at_first.iter().rev().for_each(|request| {
590 if request.is_key {
591 primary_key_indices.push(columns.len());
593 }
594 columns.push(request.column_schema.clone());
595 });
596 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
598 if original_primary_key_indices.contains(&index) {
599 primary_key_indices.push(columns.len());
600 }
601 columns.push(column_schema.clone());
602 if let Some(requests) = columns_at_after.get(&column_schema.name) {
603 requests.iter().rev().for_each(|request| {
604 if request.is_key {
605 primary_key_indices.push(columns.len());
607 }
608 columns.push(request.column_schema.clone());
609 });
610 }
611 }
612 columns_at_last.iter().for_each(|request| {
614 if request.is_key {
615 primary_key_indices.push(columns.len());
617 }
618 columns.push(request.column_schema.clone());
619 });
620
621 let mut builder = SchemaBuilder::try_from(columns)
622 .with_context(|_| error::SchemaBuildSnafu {
623 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
624 })?
625 .version(table_schema.version() + 1);
627 for (k, v) in table_schema.metadata().iter() {
628 builder = builder.add_metadata(k, v);
629 }
630 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
631 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
632 })?;
633
634 let partition_key_indices = self
635 .partition_key_indices
636 .iter()
637 .map(|idx| table_schema.column_name_by_index(*idx))
638 .map(|name| new_schema.column_index_by_name(name).unwrap())
640 .collect();
641
642 let _ = meta_builder
644 .schema(Arc::new(new_schema))
645 .primary_key_indices(primary_key_indices)
646 .partition_key_indices(partition_key_indices);
647
648 Ok(meta_builder)
649 }
650
651 fn remove_columns(
652 &self,
653 table_name: &str,
654 column_names: &[String],
655 ) -> Result<TableMetaBuilder> {
656 let table_schema = &self.schema;
657 let column_names: HashSet<_> = column_names.iter().collect();
658 let mut meta_builder = self.new_meta_builder();
659
660 let timestamp_index = table_schema.timestamp_index();
661 for column_name in &column_names {
663 if let Some(index) = table_schema.column_index_by_name(column_name) {
664 ensure!(
667 !self.primary_key_indices.contains(&index),
668 error::RemoveColumnInIndexSnafu {
669 column_name: *column_name,
670 table_name,
671 }
672 );
673
674 ensure!(
675 !self.partition_key_indices.contains(&index),
676 error::RemovePartitionColumnSnafu {
677 column_name: *column_name,
678 table_name,
679 }
680 );
681
682 if let Some(ts_index) = timestamp_index {
683 ensure!(
685 index != ts_index,
686 error::RemoveColumnInIndexSnafu {
687 column_name: table_schema.column_name_by_index(ts_index),
688 table_name,
689 }
690 );
691 }
692 } else {
693 return error::ColumnNotExistsSnafu {
694 column_name: *column_name,
695 table_name,
696 }
697 .fail()?;
698 }
699 }
700
701 let columns: Vec<_> = table_schema
703 .column_schemas()
704 .iter()
705 .filter(|column_schema| !column_names.contains(&column_schema.name))
706 .cloned()
707 .collect();
708
709 let mut builder = SchemaBuilder::try_from_columns(columns)
710 .with_context(|_| error::SchemaBuildSnafu {
711 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
712 })?
713 .version(table_schema.version() + 1);
715 for (k, v) in table_schema.metadata().iter() {
716 builder = builder.add_metadata(k, v);
717 }
718 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
719 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
720 })?;
721
722 let primary_key_indices = self
724 .primary_key_indices
725 .iter()
726 .map(|idx| table_schema.column_name_by_index(*idx))
727 .map(|name| new_schema.column_index_by_name(name).unwrap())
729 .collect();
730
731 let partition_key_indices = self
732 .partition_key_indices
733 .iter()
734 .map(|idx| table_schema.column_name_by_index(*idx))
735 .map(|name| new_schema.column_index_by_name(name).unwrap())
737 .collect();
738
739 let _ = meta_builder
740 .schema(Arc::new(new_schema))
741 .primary_key_indices(primary_key_indices)
742 .partition_key_indices(partition_key_indices);
743
744 Ok(meta_builder)
745 }
746
747 fn modify_column_types(
748 &self,
749 table_name: &str,
750 requests: &[ModifyColumnTypeRequest],
751 ) -> Result<TableMetaBuilder> {
752 let table_schema = &self.schema;
753 let mut meta_builder = self.new_meta_builder();
754
755 let mut modify_column_types = HashMap::with_capacity(requests.len());
756 let timestamp_index = table_schema.timestamp_index();
757
758 for col_to_change in requests {
759 let change_column_name = &col_to_change.column_name;
760
761 let index = table_schema
762 .column_index_by_name(change_column_name)
763 .with_context(|| error::ColumnNotExistsSnafu {
764 column_name: change_column_name,
765 table_name,
766 })?;
767
768 let column = &table_schema.column_schemas()[index];
769
770 ensure!(
771 !self.primary_key_indices.contains(&index),
772 error::InvalidAlterRequestSnafu {
773 table: table_name,
774 err: format!(
775 "Not allowed to change primary key index column '{}'",
776 column.name
777 )
778 }
779 );
780
781 if let Some(ts_index) = timestamp_index {
782 ensure!(
784 index != ts_index,
785 error::InvalidAlterRequestSnafu {
786 table: table_name,
787 err: format!(
788 "Not allowed to change timestamp index column '{}' datatype",
789 column.name
790 )
791 }
792 );
793 }
794
795 ensure!(
796 modify_column_types
797 .insert(&col_to_change.column_name, col_to_change)
798 .is_none(),
799 error::InvalidAlterRequestSnafu {
800 table: table_name,
801 err: format!(
802 "change column datatype {} more than once",
803 col_to_change.column_name
804 ),
805 }
806 );
807
808 ensure!(
809 column
810 .data_type
811 .can_arrow_type_cast_to(&col_to_change.target_type),
812 error::InvalidAlterRequestSnafu {
813 table: table_name,
814 err: format!(
815 "column '{}' cannot be cast automatically to type '{}'",
816 col_to_change.column_name, col_to_change.target_type,
817 ),
818 }
819 );
820
821 ensure!(
822 column.is_nullable(),
823 error::InvalidAlterRequestSnafu {
824 table: table_name,
825 err: format!(
826 "column '{}' must be nullable to ensure safe conversion.",
827 col_to_change.column_name,
828 ),
829 }
830 );
831 }
832 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
835 for mut column in table_schema.column_schemas().iter().cloned() {
836 if let Some(change_column) = modify_column_types.get(&column.name) {
837 column.data_type = change_column.target_type.clone();
838 let new_default = if let Some(default_value) = column.default_constraint() {
839 Some(
840 default_value
841 .cast_to_datatype(&change_column.target_type)
842 .with_context(|_| error::CastDefaultValueSnafu {
843 reason: format!(
844 "Failed to cast default value from {:?} to type {:?}",
845 default_value, &change_column.target_type
846 ),
847 })?,
848 )
849 } else {
850 None
851 };
852 column = column
853 .clone()
854 .with_default_constraint(new_default.clone())
855 .with_context(|_| error::CastDefaultValueSnafu {
856 reason: format!("Failed to set new default: {:?}", new_default),
857 })?;
858 }
859 columns.push(column)
860 }
861
862 let mut builder = SchemaBuilder::try_from_columns(columns)
863 .with_context(|_| error::SchemaBuildSnafu {
864 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
865 })?
866 .version(table_schema.version() + 1);
868 for (k, v) in table_schema.metadata().iter() {
869 builder = builder.add_metadata(k, v);
870 }
871 let new_schema = builder.build().with_context(|_| {
872 let column_names: Vec<_> = requests
873 .iter()
874 .map(|request| &request.column_name)
875 .collect();
876
877 error::SchemaBuildSnafu {
878 msg: format!(
879 "Table {table_name} cannot change datatype with columns {column_names:?}"
880 ),
881 }
882 })?;
883
884 let _ = meta_builder
885 .schema(Arc::new(new_schema))
886 .primary_key_indices(self.primary_key_indices.clone());
887
888 Ok(meta_builder)
889 }
890
891 fn split_requests_by_column_location<'a>(
893 &self,
894 table_name: &str,
895 requests: &'a [AddColumnRequest],
896 ) -> Result<SplitResult<'a>> {
897 let table_schema = &self.schema;
898 let mut columns_at_first = Vec::new();
899 let mut columns_at_after = HashMap::new();
900 let mut columns_at_last = Vec::new();
901 let mut column_names = Vec::with_capacity(requests.len());
902 for request in requests {
903 let column_name = &request.column_schema.name;
905 column_names.push(column_name.clone());
906 ensure!(
907 table_schema.column_schema_by_name(column_name).is_none(),
908 error::ColumnExistsSnafu {
909 column_name,
910 table_name,
911 }
912 );
913 match request.location.as_ref() {
914 Some(AddColumnLocation::First) => {
915 columns_at_first.push(request);
916 }
917 Some(AddColumnLocation::After { column_name }) => {
918 ensure!(
919 table_schema.column_schema_by_name(column_name).is_some(),
920 error::ColumnNotExistsSnafu {
921 column_name,
922 table_name,
923 }
924 );
925 columns_at_after
926 .entry(column_name.clone())
927 .or_insert(Vec::new())
928 .push(request);
929 }
930 None => {
931 columns_at_last.push(request);
932 }
933 }
934 }
935 Ok(SplitResult {
936 columns_at_first,
937 columns_at_after,
938 columns_at_last,
939 column_names,
940 })
941 }
942
943 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
944 let table_schema = &self.schema;
945 let mut meta_builder = self.new_meta_builder();
946 let mut columns = Vec::with_capacity(table_schema.num_columns());
947 for column_schema in table_schema.column_schemas() {
948 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
949 ensure!(
951 column_schema.default_constraint().is_some(),
952 error::InvalidAlterRequestSnafu {
953 table: table_name,
954 err: format!("column {name} does not have a default value"),
955 }
956 );
957 if !column_schema.is_nullable() {
958 return error::InvalidAlterRequestSnafu {
959 table: table_name,
960 err: format!(
961 "column {name} is not nullable and `default` cannot be dropped",
962 ),
963 }
964 .fail();
965 }
966 let new_column_schema = column_schema.clone();
967 let new_column_schema = new_column_schema
968 .with_default_constraint(None)
969 .with_context(|_| error::SchemaBuildSnafu {
970 msg: format!("Table {table_name} cannot drop default values"),
971 })?;
972 columns.push(new_column_schema);
973 } else {
974 columns.push(column_schema.clone());
975 }
976 }
977
978 let mut builder = SchemaBuilder::try_from_columns(columns)
979 .with_context(|_| error::SchemaBuildSnafu {
980 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
981 })?
982 .version(table_schema.version() + 1);
984 for (k, v) in table_schema.metadata().iter() {
985 builder = builder.add_metadata(k, v);
986 }
987 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
988 msg: format!("Table {table_name} cannot drop default values"),
989 })?;
990
991 let _ = meta_builder.schema(Arc::new(new_schema));
992
993 Ok(meta_builder)
994 }
995
996 fn set_defaults(
997 &self,
998 table_name: &str,
999 set_defaults: &[SetDefaultRequest],
1000 ) -> Result<TableMetaBuilder> {
1001 let table_schema = &self.schema;
1002 let mut meta_builder = self.new_meta_builder();
1003 let mut columns = Vec::with_capacity(table_schema.num_columns());
1004 for column_schema in table_schema.column_schemas() {
1005 if let Some(set_default) = set_defaults
1006 .iter()
1007 .find(|s| s.column_name == column_schema.name)
1008 {
1009 let new_column_schema = column_schema.clone();
1010 let new_column_schema = new_column_schema
1011 .with_default_constraint(set_default.default_constraint.clone())
1012 .with_context(|_| error::SchemaBuildSnafu {
1013 msg: format!("Table {table_name} cannot set default values"),
1014 })?;
1015 columns.push(new_column_schema);
1016 } else {
1017 columns.push(column_schema.clone());
1018 }
1019 }
1020
1021 let mut builder = SchemaBuilder::try_from_columns(columns)
1022 .with_context(|_| error::SchemaBuildSnafu {
1023 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1024 })?
1025 .version(table_schema.version() + 1);
1027 for (k, v) in table_schema.metadata().iter() {
1028 builder = builder.add_metadata(k, v);
1029 }
1030 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1031 msg: format!("Table {table_name} cannot set default values"),
1032 })?;
1033
1034 let _ = meta_builder.schema(Arc::new(new_schema));
1035
1036 Ok(meta_builder)
1037 }
1038}
1039
1040#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1041#[builder(pattern = "owned")]
1042pub struct TableInfo {
1043 #[builder(default, setter(into))]
1045 pub ident: TableIdent,
1046 #[builder(setter(into))]
1048 pub name: String,
1049 #[builder(default, setter(into))]
1051 pub desc: Option<String>,
1052 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1053 pub catalog_name: String,
1054 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1055 pub schema_name: String,
1056 pub meta: TableMeta,
1057 #[builder(default = "TableType::Base")]
1058 pub table_type: TableType,
1059}
1060
1061pub type TableInfoRef = Arc<TableInfo>;
1062
1063impl TableInfo {
1064 pub fn table_id(&self) -> TableId {
1065 self.ident.table_id
1066 }
1067
1068 pub fn region_ids(&self) -> Vec<RegionId> {
1069 self.meta
1070 .region_numbers
1071 .iter()
1072 .map(|id| RegionId::new(self.table_id(), *id))
1073 .collect()
1074 }
1075 pub fn full_table_name(&self) -> String {
1077 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1078 }
1079
1080 pub fn get_db_string(&self) -> String {
1081 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1082 }
1083
1084 pub fn is_physical_table(&self) -> bool {
1086 self.meta
1087 .options
1088 .extra_options
1089 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1090 }
1091
1092 pub fn is_ttl_instant_table(&self) -> bool {
1094 self.meta
1095 .options
1096 .ttl
1097 .map(|t| t.is_instant())
1098 .unwrap_or(false)
1099 }
1100}
1101
1102impl TableInfoBuilder {
1103 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1104 Self {
1105 name: Some(name.into()),
1106 meta: Some(meta),
1107 ..Default::default()
1108 }
1109 }
1110
1111 pub fn table_id(mut self, id: TableId) -> Self {
1112 let ident = self.ident.get_or_insert_with(TableIdent::default);
1113 ident.table_id = id;
1114 self
1115 }
1116
1117 pub fn table_version(mut self, version: TableVersion) -> Self {
1118 let ident = self.ident.get_or_insert_with(TableIdent::default);
1119 ident.version = version;
1120 self
1121 }
1122}
1123
1124impl TableIdent {
1125 pub fn new(table_id: TableId) -> Self {
1126 Self {
1127 table_id,
1128 version: 0,
1129 }
1130 }
1131}
1132
1133impl From<TableId> for TableIdent {
1134 fn from(table_id: TableId) -> Self {
1135 Self::new(table_id)
1136 }
1137}
1138
1139#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1141pub struct RawTableMeta {
1142 pub schema: RawSchema,
1143 pub primary_key_indices: Vec<usize>,
1146 pub value_indices: Vec<usize>,
1149 pub engine: String,
1151 pub next_column_id: ColumnId,
1154 pub region_numbers: Vec<u32>,
1155 pub options: TableOptions,
1156 pub created_on: DateTime<Utc>,
1157 #[serde(default)]
1159 pub partition_key_indices: Vec<usize>,
1160 #[serde(default)]
1163 pub column_ids: Vec<ColumnId>,
1164}
1165
1166impl From<TableMeta> for RawTableMeta {
1167 fn from(meta: TableMeta) -> RawTableMeta {
1168 RawTableMeta {
1169 schema: RawSchema::from(&*meta.schema),
1170 primary_key_indices: meta.primary_key_indices,
1171 value_indices: meta.value_indices,
1172 engine: meta.engine,
1173 next_column_id: meta.next_column_id,
1174 region_numbers: meta.region_numbers,
1175 options: meta.options,
1176 created_on: meta.created_on,
1177 partition_key_indices: meta.partition_key_indices,
1178 column_ids: meta.column_ids,
1179 }
1180 }
1181}
1182
1183impl TryFrom<RawTableMeta> for TableMeta {
1184 type Error = ConvertError;
1185
1186 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1187 Ok(TableMeta {
1188 schema: Arc::new(Schema::try_from(raw.schema)?),
1189 primary_key_indices: raw.primary_key_indices,
1190 value_indices: raw.value_indices,
1191 engine: raw.engine,
1192 region_numbers: raw.region_numbers,
1193 next_column_id: raw.next_column_id,
1194 options: raw.options,
1195 created_on: raw.created_on,
1196 partition_key_indices: raw.partition_key_indices,
1197 column_ids: raw.column_ids,
1198 })
1199 }
1200}
1201
1202#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1204pub struct RawTableInfo {
1205 pub ident: TableIdent,
1206 pub name: String,
1207 pub desc: Option<String>,
1208 pub catalog_name: String,
1209 pub schema_name: String,
1210 pub meta: RawTableMeta,
1211 pub table_type: TableType,
1212}
1213
1214impl RawTableInfo {
1215 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1219 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1220 None
1221 } else {
1222 Some(
1223 self.meta
1224 .column_ids
1225 .iter()
1226 .enumerate()
1227 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1228 .collect(),
1229 )
1230 }
1231 }
1232
1233 pub fn sort_columns(&mut self) {
1235 let column_schemas = &self.meta.schema.column_schemas;
1236 let primary_keys = self
1237 .meta
1238 .primary_key_indices
1239 .iter()
1240 .map(|index| column_schemas[*index].name.clone())
1241 .collect::<HashSet<_>>();
1242
1243 let name_to_ids = self.name_to_ids().unwrap_or_default();
1244 self.meta
1245 .schema
1246 .column_schemas
1247 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1248
1249 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1251 let mut timestamp_index = None;
1252 let mut value_indices =
1253 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1254 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1255 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1256 if primary_keys.contains(&column_schema.name) {
1257 primary_key_indices.push(index);
1258 } else if column_schema.is_time_index() {
1259 value_indices.push(index);
1260 timestamp_index = Some(index);
1261 } else {
1262 value_indices.push(index);
1263 }
1264 if let Some(id) = name_to_ids.get(&column_schema.name) {
1265 column_ids.push(*id);
1266 }
1267 }
1268
1269 self.meta.schema.timestamp_index = timestamp_index;
1271 self.meta.primary_key_indices = primary_key_indices;
1272 self.meta.value_indices = value_indices;
1273 self.meta.column_ids = column_ids;
1274 }
1275
1276 pub fn to_region_options(&self) -> HashMap<String, String> {
1280 HashMap::from(&self.meta.options)
1281 }
1282
1283 pub fn table_ref(&self) -> TableReference {
1285 TableReference::full(
1286 self.catalog_name.as_str(),
1287 self.schema_name.as_str(),
1288 self.name.as_str(),
1289 )
1290 }
1291}
1292
1293impl From<TableInfo> for RawTableInfo {
1294 fn from(info: TableInfo) -> RawTableInfo {
1295 RawTableInfo {
1296 ident: info.ident,
1297 name: info.name,
1298 desc: info.desc,
1299 catalog_name: info.catalog_name,
1300 schema_name: info.schema_name,
1301 meta: RawTableMeta::from(info.meta),
1302 table_type: info.table_type,
1303 }
1304 }
1305}
1306
1307impl TryFrom<RawTableInfo> for TableInfo {
1308 type Error = ConvertError;
1309
1310 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1311 Ok(TableInfo {
1312 ident: raw.ident,
1313 name: raw.name,
1314 desc: raw.desc,
1315 catalog_name: raw.catalog_name,
1316 schema_name: raw.schema_name,
1317 meta: TableMeta::try_from(raw.meta)?,
1318 table_type: raw.table_type,
1319 })
1320 }
1321}
1322
1323fn set_column_fulltext_options(
1332 column_schema: &mut ColumnSchema,
1333 column_name: &str,
1334 options: &FulltextOptions,
1335 current_options: Option<FulltextOptions>,
1336) -> Result<()> {
1337 if let Some(current_options) = current_options {
1338 ensure!(
1339 current_options.analyzer == options.analyzer
1340 && current_options.case_sensitive == options.case_sensitive,
1341 error::InvalidColumnOptionSnafu {
1342 column_name,
1343 msg: format!(
1344 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1345 current_options.analyzer, current_options.case_sensitive
1346 ),
1347 }
1348 );
1349 }
1350
1351 column_schema
1352 .set_fulltext_options(options)
1353 .context(error::SetFulltextOptionsSnafu { column_name })?;
1354
1355 Ok(())
1356}
1357
1358fn unset_column_fulltext_options(
1359 column_schema: &mut ColumnSchema,
1360 column_name: &str,
1361 current_options: Option<FulltextOptions>,
1362) -> Result<()> {
1363 ensure!(
1364 current_options
1365 .as_ref()
1366 .is_some_and(|options| options.enable),
1367 error::InvalidColumnOptionSnafu {
1368 column_name,
1369 msg: "FULLTEXT index already disabled".to_string(),
1370 }
1371 );
1372
1373 let mut options = current_options.unwrap();
1374 options.enable = false;
1375 column_schema
1376 .set_fulltext_options(&options)
1377 .context(error::SetFulltextOptionsSnafu { column_name })?;
1378
1379 Ok(())
1380}
1381
1382fn set_column_skipping_index_options(
1383 column_schema: &mut ColumnSchema,
1384 column_name: &str,
1385 options: &SkippingIndexOptions,
1386) -> Result<()> {
1387 column_schema
1388 .set_skipping_options(options)
1389 .context(error::SetSkippingOptionsSnafu { column_name })?;
1390
1391 Ok(())
1392}
1393
1394fn unset_column_skipping_index_options(
1395 column_schema: &mut ColumnSchema,
1396 column_name: &str,
1397) -> Result<()> {
1398 column_schema
1399 .unset_skipping_options()
1400 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1401 Ok(())
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406 use std::assert_matches::assert_matches;
1407
1408 use common_error::ext::ErrorExt;
1409 use common_error::status_code::StatusCode;
1410 use datatypes::data_type::ConcreteDataType;
1411 use datatypes::schema::{
1412 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1413 };
1414
1415 use super::*;
1416 use crate::Error;
1417
1418 fn new_test_schema() -> Schema {
1420 let column_schemas = vec![
1421 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1422 ColumnSchema::new(
1423 "ts",
1424 ConcreteDataType::timestamp_millisecond_datatype(),
1425 false,
1426 )
1427 .with_time_index(true),
1428 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1429 ];
1430 SchemaBuilder::try_from(column_schemas)
1431 .unwrap()
1432 .version(123)
1433 .build()
1434 .unwrap()
1435 }
1436
1437 #[test]
1438 fn test_raw_convert() {
1439 let schema = Arc::new(new_test_schema());
1440 let meta = TableMetaBuilder::empty()
1441 .schema(schema)
1442 .primary_key_indices(vec![0])
1443 .engine("engine")
1444 .next_column_id(3)
1445 .build()
1446 .unwrap();
1447 let info = TableInfoBuilder::default()
1448 .table_id(10)
1449 .table_version(5)
1450 .name("mytable")
1451 .meta(meta)
1452 .build()
1453 .unwrap();
1454
1455 let raw = RawTableInfo::from(info.clone());
1456 let info_new = TableInfo::try_from(raw).unwrap();
1457
1458 assert_eq!(info, info_new);
1459 }
1460
1461 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1462 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1463 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1464 let alter_kind = AlterKind::AddColumns {
1465 columns: vec![
1466 AddColumnRequest {
1467 column_schema: new_tag,
1468 is_key: true,
1469 location: None,
1470 add_if_not_exists: false,
1471 },
1472 AddColumnRequest {
1473 column_schema: new_field,
1474 is_key: false,
1475 location: None,
1476 add_if_not_exists: false,
1477 },
1478 ],
1479 };
1480
1481 let builder = meta
1482 .builder_with_alter_kind("my_table", &alter_kind)
1483 .unwrap();
1484 builder.build().unwrap()
1485 }
1486
1487 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1488 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1489 let new_field = ColumnSchema::new(
1490 "my_field_after_ts",
1491 ConcreteDataType::string_datatype(),
1492 true,
1493 );
1494 let yet_another_field = ColumnSchema::new(
1495 "yet_another_field_after_ts",
1496 ConcreteDataType::int64_datatype(),
1497 true,
1498 );
1499 let alter_kind = AlterKind::AddColumns {
1500 columns: vec![
1501 AddColumnRequest {
1502 column_schema: new_tag,
1503 is_key: true,
1504 location: Some(AddColumnLocation::First),
1505 add_if_not_exists: false,
1506 },
1507 AddColumnRequest {
1508 column_schema: new_field,
1509 is_key: false,
1510 location: Some(AddColumnLocation::After {
1511 column_name: "ts".to_string(),
1512 }),
1513 add_if_not_exists: false,
1514 },
1515 AddColumnRequest {
1516 column_schema: yet_another_field,
1517 is_key: true,
1518 location: Some(AddColumnLocation::After {
1519 column_name: "ts".to_string(),
1520 }),
1521 add_if_not_exists: false,
1522 },
1523 ],
1524 };
1525
1526 let builder = meta
1527 .builder_with_alter_kind("my_table", &alter_kind)
1528 .unwrap();
1529 builder.build().unwrap()
1530 }
1531
1532 #[test]
1533 fn test_add_columns() {
1534 let schema = Arc::new(new_test_schema());
1535 let meta = TableMetaBuilder::empty()
1536 .schema(schema)
1537 .primary_key_indices(vec![0])
1538 .engine("engine")
1539 .next_column_id(3)
1540 .build()
1541 .unwrap();
1542
1543 let new_meta = add_columns_to_meta(&meta);
1544 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1545
1546 let names: Vec<String> = new_meta
1547 .schema
1548 .column_schemas()
1549 .iter()
1550 .map(|column_schema| column_schema.name.clone())
1551 .collect();
1552 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1553 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1554 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1555 }
1556
1557 #[test]
1558 fn test_add_columns_multiple_times() {
1559 let schema = Arc::new(new_test_schema());
1560 let meta = TableMetaBuilder::empty()
1561 .schema(schema)
1562 .primary_key_indices(vec![0])
1563 .engine("engine")
1564 .next_column_id(3)
1565 .build()
1566 .unwrap();
1567
1568 let alter_kind = AlterKind::AddColumns {
1569 columns: vec![
1570 AddColumnRequest {
1571 column_schema: ColumnSchema::new(
1572 "col3",
1573 ConcreteDataType::int32_datatype(),
1574 true,
1575 ),
1576 is_key: true,
1577 location: None,
1578 add_if_not_exists: true,
1579 },
1580 AddColumnRequest {
1581 column_schema: ColumnSchema::new(
1582 "col3",
1583 ConcreteDataType::int32_datatype(),
1584 true,
1585 ),
1586 is_key: true,
1587 location: None,
1588 add_if_not_exists: true,
1589 },
1590 ],
1591 };
1592 let err = meta
1593 .builder_with_alter_kind("my_table", &alter_kind)
1594 .err()
1595 .unwrap();
1596 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1597 }
1598
1599 #[test]
1600 fn test_remove_columns() {
1601 let schema = Arc::new(new_test_schema());
1602 let meta = TableMetaBuilder::empty()
1603 .schema(schema.clone())
1604 .primary_key_indices(vec![0])
1605 .engine("engine")
1606 .next_column_id(3)
1607 .build()
1608 .unwrap();
1609 let meta = add_columns_to_meta(&meta);
1611
1612 let alter_kind = AlterKind::DropColumns {
1613 names: vec![String::from("col2"), String::from("my_field")],
1614 };
1615 let new_meta = meta
1616 .builder_with_alter_kind("my_table", &alter_kind)
1617 .unwrap()
1618 .build()
1619 .unwrap();
1620
1621 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1622
1623 let names: Vec<String> = new_meta
1624 .schema
1625 .column_schemas()
1626 .iter()
1627 .map(|column_schema| column_schema.name.clone())
1628 .collect();
1629 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1630 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1631 assert_eq!(&[1], &new_meta.value_indices[..]);
1632 assert_eq!(
1633 schema.timestamp_column(),
1634 new_meta.schema.timestamp_column()
1635 );
1636 }
1637
1638 #[test]
1639 fn test_remove_multiple_columns_before_timestamp() {
1640 let column_schemas = vec![
1641 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1642 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1643 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1644 ColumnSchema::new(
1645 "ts",
1646 ConcreteDataType::timestamp_millisecond_datatype(),
1647 false,
1648 )
1649 .with_time_index(true),
1650 ];
1651 let schema = Arc::new(
1652 SchemaBuilder::try_from(column_schemas)
1653 .unwrap()
1654 .version(123)
1655 .build()
1656 .unwrap(),
1657 );
1658 let meta = TableMetaBuilder::empty()
1659 .schema(schema.clone())
1660 .primary_key_indices(vec![1])
1661 .engine("engine")
1662 .next_column_id(4)
1663 .build()
1664 .unwrap();
1665
1666 let alter_kind = AlterKind::DropColumns {
1668 names: vec![String::from("col3"), String::from("col1")],
1669 };
1670 let new_meta = meta
1671 .builder_with_alter_kind("my_table", &alter_kind)
1672 .unwrap()
1673 .build()
1674 .unwrap();
1675
1676 let names: Vec<String> = new_meta
1677 .schema
1678 .column_schemas()
1679 .iter()
1680 .map(|column_schema| column_schema.name.clone())
1681 .collect();
1682 assert_eq!(&["col2", "ts"], &names[..]);
1683 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1684 assert_eq!(&[1], &new_meta.value_indices[..]);
1685 assert_eq!(
1686 schema.timestamp_column(),
1687 new_meta.schema.timestamp_column()
1688 );
1689 }
1690
1691 #[test]
1692 fn test_add_existing_column() {
1693 let schema = Arc::new(new_test_schema());
1694 let meta = TableMetaBuilder::empty()
1695 .schema(schema)
1696 .primary_key_indices(vec![0])
1697 .engine("engine")
1698 .next_column_id(3)
1699 .build()
1700 .unwrap();
1701
1702 let alter_kind = AlterKind::AddColumns {
1703 columns: vec![AddColumnRequest {
1704 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1705 is_key: false,
1706 location: None,
1707 add_if_not_exists: false,
1708 }],
1709 };
1710
1711 let err = meta
1712 .builder_with_alter_kind("my_table", &alter_kind)
1713 .err()
1714 .unwrap();
1715 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1716
1717 let alter_kind = AlterKind::AddColumns {
1719 columns: vec![AddColumnRequest {
1720 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1721 is_key: true,
1722 location: None,
1723 add_if_not_exists: true,
1724 }],
1725 };
1726 let new_meta = meta
1727 .builder_with_alter_kind("my_table", &alter_kind)
1728 .unwrap()
1729 .build()
1730 .unwrap();
1731 assert_eq!(
1732 meta.schema.column_schemas(),
1733 new_meta.schema.column_schemas()
1734 );
1735 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1736 }
1737
1738 #[test]
1739 fn test_add_different_type_column() {
1740 let schema = Arc::new(new_test_schema());
1741 let meta = TableMetaBuilder::empty()
1742 .schema(schema)
1743 .primary_key_indices(vec![0])
1744 .engine("engine")
1745 .next_column_id(3)
1746 .build()
1747 .unwrap();
1748
1749 let alter_kind = AlterKind::AddColumns {
1751 columns: vec![AddColumnRequest {
1752 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1753 is_key: false,
1754 location: None,
1755 add_if_not_exists: true,
1756 }],
1757 };
1758 let err = meta
1759 .builder_with_alter_kind("my_table", &alter_kind)
1760 .err()
1761 .unwrap();
1762 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1763 }
1764
1765 #[test]
1766 fn test_add_invalid_column() {
1767 let schema = Arc::new(new_test_schema());
1768 let meta = TableMetaBuilder::empty()
1769 .schema(schema)
1770 .primary_key_indices(vec![0])
1771 .engine("engine")
1772 .next_column_id(3)
1773 .build()
1774 .unwrap();
1775
1776 let alter_kind = AlterKind::AddColumns {
1778 columns: vec![AddColumnRequest {
1779 column_schema: ColumnSchema::new(
1780 "weny",
1781 ConcreteDataType::string_datatype(),
1782 false,
1783 ),
1784 is_key: false,
1785 location: None,
1786 add_if_not_exists: false,
1787 }],
1788 };
1789
1790 let err = meta
1791 .builder_with_alter_kind("my_table", &alter_kind)
1792 .err()
1793 .unwrap();
1794 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1795 }
1796
1797 #[test]
1798 fn test_remove_unknown_column() {
1799 let schema = Arc::new(new_test_schema());
1800 let meta = TableMetaBuilder::empty()
1801 .schema(schema)
1802 .primary_key_indices(vec![0])
1803 .engine("engine")
1804 .next_column_id(3)
1805 .build()
1806 .unwrap();
1807
1808 let alter_kind = AlterKind::DropColumns {
1809 names: vec![String::from("unknown")],
1810 };
1811
1812 let err = meta
1813 .builder_with_alter_kind("my_table", &alter_kind)
1814 .err()
1815 .unwrap();
1816 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1817 }
1818
1819 #[test]
1820 fn test_change_unknown_column_data_type() {
1821 let schema = Arc::new(new_test_schema());
1822 let meta = TableMetaBuilder::empty()
1823 .schema(schema)
1824 .primary_key_indices(vec![0])
1825 .engine("engine")
1826 .next_column_id(3)
1827 .build()
1828 .unwrap();
1829
1830 let alter_kind = AlterKind::ModifyColumnTypes {
1831 columns: vec![ModifyColumnTypeRequest {
1832 column_name: "unknown".to_string(),
1833 target_type: ConcreteDataType::string_datatype(),
1834 }],
1835 };
1836
1837 let err = meta
1838 .builder_with_alter_kind("my_table", &alter_kind)
1839 .err()
1840 .unwrap();
1841 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1842 }
1843
1844 #[test]
1845 fn test_remove_key_column() {
1846 let schema = Arc::new(new_test_schema());
1847 let meta = TableMetaBuilder::empty()
1848 .schema(schema)
1849 .primary_key_indices(vec![0])
1850 .engine("engine")
1851 .next_column_id(3)
1852 .build()
1853 .unwrap();
1854
1855 let alter_kind = AlterKind::DropColumns {
1857 names: vec![String::from("col1")],
1858 };
1859
1860 let err = meta
1861 .builder_with_alter_kind("my_table", &alter_kind)
1862 .err()
1863 .unwrap();
1864 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1865
1866 let alter_kind = AlterKind::DropColumns {
1868 names: vec![String::from("ts")],
1869 };
1870
1871 let err = meta
1872 .builder_with_alter_kind("my_table", &alter_kind)
1873 .err()
1874 .unwrap();
1875 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1876 }
1877
1878 #[test]
1879 fn test_remove_partition_column() {
1880 let schema = Arc::new(new_test_schema());
1881 let meta = TableMetaBuilder::empty()
1882 .schema(schema)
1883 .primary_key_indices(vec![])
1884 .partition_key_indices(vec![0])
1885 .engine("engine")
1886 .next_column_id(3)
1887 .build()
1888 .unwrap();
1889 let alter_kind = AlterKind::DropColumns {
1891 names: vec![String::from("col1")],
1892 };
1893
1894 let err = meta
1895 .builder_with_alter_kind("my_table", &alter_kind)
1896 .err()
1897 .unwrap();
1898 assert_matches!(err, Error::RemovePartitionColumn { .. });
1899 }
1900
1901 #[test]
1902 fn test_change_key_column_data_type() {
1903 let schema = Arc::new(new_test_schema());
1904 let meta = TableMetaBuilder::empty()
1905 .schema(schema)
1906 .primary_key_indices(vec![0])
1907 .engine("engine")
1908 .next_column_id(3)
1909 .build()
1910 .unwrap();
1911
1912 let alter_kind = AlterKind::ModifyColumnTypes {
1914 columns: vec![ModifyColumnTypeRequest {
1915 column_name: "col1".to_string(),
1916 target_type: ConcreteDataType::string_datatype(),
1917 }],
1918 };
1919
1920 let err = meta
1921 .builder_with_alter_kind("my_table", &alter_kind)
1922 .err()
1923 .unwrap();
1924 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1925
1926 let alter_kind = AlterKind::ModifyColumnTypes {
1928 columns: vec![ModifyColumnTypeRequest {
1929 column_name: "ts".to_string(),
1930 target_type: ConcreteDataType::string_datatype(),
1931 }],
1932 };
1933
1934 let err = meta
1935 .builder_with_alter_kind("my_table", &alter_kind)
1936 .err()
1937 .unwrap();
1938 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1939 }
1940
1941 #[test]
1942 fn test_alloc_new_column() {
1943 let schema = Arc::new(new_test_schema());
1944 let mut meta = TableMetaBuilder::empty()
1945 .schema(schema)
1946 .primary_key_indices(vec![0])
1947 .engine("engine")
1948 .next_column_id(3)
1949 .build()
1950 .unwrap();
1951 assert_eq!(3, meta.next_column_id);
1952
1953 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1954 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1955
1956 assert_eq!(4, meta.next_column_id);
1957 assert_eq!(column_schema.name, desc.name);
1958 }
1959
1960 #[test]
1961 fn test_add_columns_with_location() {
1962 let schema = Arc::new(new_test_schema());
1963 let meta = TableMetaBuilder::empty()
1964 .schema(schema)
1965 .primary_key_indices(vec![0])
1966 .partition_key_indices(vec![0, 2])
1968 .engine("engine")
1969 .next_column_id(3)
1970 .build()
1971 .unwrap();
1972
1973 let new_meta = add_columns_to_meta_with_location(&meta);
1974 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1975
1976 let names: Vec<String> = new_meta
1977 .schema
1978 .column_schemas()
1979 .iter()
1980 .map(|column_schema| column_schema.name.clone())
1981 .collect();
1982 assert_eq!(
1983 &[
1984 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
1991 &names[..]
1992 );
1993 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
1994 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
1995 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
1996 }
1997
1998 #[test]
1999 fn test_modify_column_fulltext_options() {
2000 let schema = Arc::new(new_test_schema());
2001 let meta = TableMetaBuilder::empty()
2002 .schema(schema)
2003 .primary_key_indices(vec![0])
2004 .engine("engine")
2005 .next_column_id(3)
2006 .build()
2007 .unwrap();
2008
2009 let alter_kind = AlterKind::SetIndexes {
2010 options: vec![SetIndexOption::Fulltext {
2011 column_name: "col1".to_string(),
2012 options: FulltextOptions::default(),
2013 }],
2014 };
2015 let err = meta
2016 .builder_with_alter_kind("my_table", &alter_kind)
2017 .err()
2018 .unwrap();
2019 assert_eq!(
2020 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2021 err.to_string()
2022 );
2023
2024 let new_meta = add_columns_to_meta_with_location(&meta);
2026 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2027
2028 let alter_kind = AlterKind::SetIndexes {
2029 options: vec![SetIndexOption::Fulltext {
2030 column_name: "my_tag_first".to_string(),
2031 options: FulltextOptions::new_unchecked(
2032 true,
2033 FulltextAnalyzer::Chinese,
2034 true,
2035 FulltextBackend::Bloom,
2036 1000,
2037 0.01,
2038 ),
2039 }],
2040 };
2041 let new_meta = new_meta
2042 .builder_with_alter_kind("my_table", &alter_kind)
2043 .unwrap()
2044 .build()
2045 .unwrap();
2046 let column_schema = new_meta
2047 .schema
2048 .column_schema_by_name("my_tag_first")
2049 .unwrap();
2050 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2051 assert!(fulltext_options.enable);
2052 assert_eq!(
2053 datatypes::schema::FulltextAnalyzer::Chinese,
2054 fulltext_options.analyzer
2055 );
2056 assert!(fulltext_options.case_sensitive);
2057
2058 let alter_kind = AlterKind::UnsetIndexes {
2059 options: vec![UnsetIndexOption::Fulltext {
2060 column_name: "my_tag_first".to_string(),
2061 }],
2062 };
2063 let new_meta = new_meta
2064 .builder_with_alter_kind("my_table", &alter_kind)
2065 .unwrap()
2066 .build()
2067 .unwrap();
2068 let column_schema = new_meta
2069 .schema
2070 .column_schema_by_name("my_tag_first")
2071 .unwrap();
2072 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2073 assert!(!fulltext_options.enable);
2074 }
2075}