1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
106pub struct TableIdent {
107 pub table_id: TableId,
109 pub version: TableVersion,
112}
113
114#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
118#[builder(pattern = "mutable", custom_constructor)]
119pub struct TableMeta {
120 pub schema: SchemaRef,
121 pub primary_key_indices: Vec<usize>,
124 #[builder(default = "self.default_value_indices()?")]
125 pub value_indices: Vec<usize>,
126 #[builder(default, setter(into))]
127 pub engine: String,
128 #[builder(default, setter(into))]
129 pub region_numbers: Vec<u32>,
130 pub next_column_id: ColumnId,
131 #[builder(default)]
133 pub options: TableOptions,
134 #[builder(default = "Utc::now()")]
135 pub created_on: DateTime<Utc>,
136 #[builder(default = "Vec::new()")]
137 pub partition_key_indices: Vec<usize>,
138 #[builder(default = "Vec::new()")]
139 pub column_ids: Vec<ColumnId>,
140}
141
142impl TableMetaBuilder {
143 #[cfg(any(test, feature = "testing"))]
145 pub fn empty() -> Self {
146 Self {
147 schema: None,
148 primary_key_indices: None,
149 value_indices: None,
150 engine: None,
151 region_numbers: None,
152 next_column_id: None,
153 options: None,
154 created_on: None,
155 partition_key_indices: None,
156 column_ids: None,
157 }
158 }
159}
160
161impl TableMetaBuilder {
162 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
163 match (&self.primary_key_indices, &self.schema) {
164 (Some(v), Some(schema)) => {
165 let column_schemas = schema.column_schemas();
166 Ok((0..column_schemas.len())
167 .filter(|idx| !v.contains(idx))
168 .collect())
169 }
170 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
171 }
172 }
173
174 pub fn new_external_table() -> Self {
175 Self {
176 schema: None,
177 primary_key_indices: Some(Vec::new()),
178 value_indices: Some(Vec::new()),
179 engine: None,
180 region_numbers: Some(Vec::new()),
181 next_column_id: Some(0),
182 options: None,
183 created_on: None,
184 partition_key_indices: None,
185 column_ids: None,
186 }
187 }
188}
189
190struct SplitResult<'a> {
192 columns_at_first: Vec<&'a AddColumnRequest>,
194 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
196 columns_at_last: Vec<&'a AddColumnRequest>,
198 column_names: Vec<String>,
200}
201
202impl TableMeta {
203 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
204 let columns_schemas = &self.schema.column_schemas();
205 self.primary_key_indices
206 .iter()
207 .map(|idx| &columns_schemas[*idx].name)
208 }
209
210 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
211 let columns_schemas = self.schema.column_schemas();
213 let primary_key_indices = &self.primary_key_indices;
214 columns_schemas
215 .iter()
216 .enumerate()
217 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
218 .map(|(_, cs)| &cs.name)
219 }
220
221 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
222 let columns_schemas = &self.schema.column_schemas();
223 self.partition_key_indices
224 .iter()
225 .map(|idx| &columns_schemas[*idx].name)
226 }
227
228 pub fn builder_with_alter_kind(
232 &self,
233 table_name: &str,
234 alter_kind: &AlterKind,
235 ) -> Result<TableMetaBuilder> {
236 match alter_kind {
237 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
238 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
239 AlterKind::ModifyColumnTypes { columns } => {
240 self.modify_column_types(table_name, columns)
241 }
242 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
244 AlterKind::SetTableOptions { options } => self.set_table_options(options),
245 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
246 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
247 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
248 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
249 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
250 }
251 }
252
253 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
255 let mut new_options = self.options.clone();
256
257 for request in requests {
258 match request {
259 SetRegionOption::Ttl(new_ttl) => {
260 new_options.ttl = *new_ttl;
261 }
262 SetRegionOption::Twsc(key, value) => {
263 if !value.is_empty() {
264 new_options
265 .extra_options
266 .insert(key.to_string(), value.to_string());
267 new_options.extra_options.insert(
269 COMPACTION_TYPE.to_string(),
270 COMPACTION_TYPE_TWCS.to_string(),
271 );
272 } else {
273 new_options.extra_options.remove(key.as_str());
275 }
276 }
277 }
278 }
279 let mut builder = self.new_meta_builder();
280 builder.options(new_options);
281
282 Ok(builder)
283 }
284
285 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
286 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
287 self.set_table_options(&requests)
288 }
289
290 fn set_indexes(
291 &self,
292 table_name: &str,
293 requests: &[SetIndexOption],
294 ) -> Result<TableMetaBuilder> {
295 let table_schema = &self.schema;
296 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
297 for request in requests {
298 let column_name = request.column_name();
299 table_schema
300 .column_index_by_name(column_name)
301 .with_context(|| error::ColumnNotExistsSnafu {
302 column_name,
303 table_name,
304 })?;
305 set_index_options
306 .entry(column_name)
307 .or_default()
308 .push(request);
309 }
310
311 let mut meta_builder = self.new_meta_builder();
312 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
313 for mut column in table_schema.column_schemas().iter().cloned() {
314 if let Some(request) = set_index_options.get(column.name.as_str()) {
315 for request in request {
316 self.set_index(&mut column, request)?;
317 }
318 }
319 columns.push(column);
320 }
321
322 let mut builder = SchemaBuilder::try_from_columns(columns)
323 .with_context(|_| error::SchemaBuildSnafu {
324 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
325 })?
326 .version(table_schema.version() + 1);
327
328 for (k, v) in table_schema.metadata().iter() {
329 builder = builder.add_metadata(k, v);
330 }
331
332 let new_schema = builder.build().with_context(|_| {
333 let column_names = requests
334 .iter()
335 .map(|request| request.column_name())
336 .collect::<Vec<_>>();
337 error::SchemaBuildSnafu {
338 msg: format!(
339 "Table {table_name} cannot set index options with columns {column_names:?}",
340 ),
341 }
342 })?;
343 let _ = meta_builder
344 .schema(Arc::new(new_schema))
345 .primary_key_indices(self.primary_key_indices.clone());
346
347 Ok(meta_builder)
348 }
349
350 fn unset_indexes(
351 &self,
352 table_name: &str,
353 requests: &[UnsetIndexOption],
354 ) -> Result<TableMetaBuilder> {
355 let table_schema = &self.schema;
356 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
357 for request in requests {
358 let column_name = request.column_name();
359 table_schema
360 .column_index_by_name(column_name)
361 .with_context(|| error::ColumnNotExistsSnafu {
362 column_name,
363 table_name,
364 })?;
365 set_index_options
366 .entry(column_name)
367 .or_default()
368 .push(request);
369 }
370
371 let mut meta_builder = self.new_meta_builder();
372 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
373 for mut column in table_schema.column_schemas().iter().cloned() {
374 if let Some(request) = set_index_options.get(column.name.as_str()) {
375 for request in request {
376 self.unset_index(&mut column, request)?;
377 }
378 }
379 columns.push(column);
380 }
381
382 let mut builder = SchemaBuilder::try_from_columns(columns)
383 .with_context(|_| error::SchemaBuildSnafu {
384 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
385 })?
386 .version(table_schema.version() + 1);
387
388 for (k, v) in table_schema.metadata().iter() {
389 builder = builder.add_metadata(k, v);
390 }
391
392 let new_schema = builder.build().with_context(|_| {
393 let column_names = requests
394 .iter()
395 .map(|request| request.column_name())
396 .collect::<Vec<_>>();
397 error::SchemaBuildSnafu {
398 msg: format!(
399 "Table {table_name} cannot set index options with columns {column_names:?}",
400 ),
401 }
402 })?;
403 let _ = meta_builder
404 .schema(Arc::new(new_schema))
405 .primary_key_indices(self.primary_key_indices.clone());
406
407 Ok(meta_builder)
408 }
409
410 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
411 match request {
412 SetIndexOption::Fulltext {
413 column_name,
414 options,
415 } => {
416 ensure!(
417 column_schema.data_type.is_string(),
418 error::InvalidColumnOptionSnafu {
419 column_name,
420 msg: "FULLTEXT index only supports string type",
421 }
422 );
423
424 let current_fulltext_options = column_schema
425 .fulltext_options()
426 .context(error::SetFulltextOptionsSnafu { column_name })?;
427 set_column_fulltext_options(
428 column_schema,
429 column_name,
430 options,
431 current_fulltext_options,
432 )?;
433 }
434 SetIndexOption::Inverted { column_name } => {
435 debug_assert_eq!(column_schema.name, *column_name);
436 column_schema.set_inverted_index(true);
437 }
438 SetIndexOption::Skipping {
439 column_name,
440 options,
441 } => {
442 set_column_skipping_index_options(column_schema, column_name, options)?;
443 }
444 }
445
446 Ok(())
447 }
448
449 fn unset_index(
450 &self,
451 column_schema: &mut ColumnSchema,
452 request: &UnsetIndexOption,
453 ) -> Result<()> {
454 match request {
455 UnsetIndexOption::Fulltext { column_name } => {
456 let current_fulltext_options = column_schema
457 .fulltext_options()
458 .context(error::SetFulltextOptionsSnafu { column_name })?;
459 unset_column_fulltext_options(
460 column_schema,
461 column_name,
462 current_fulltext_options.clone(),
463 )?
464 }
465 UnsetIndexOption::Inverted { .. } => {
466 column_schema.set_inverted_index(false);
467 }
468 UnsetIndexOption::Skipping { column_name } => {
469 unset_column_skipping_index_options(column_schema, column_name)?;
470 }
471 }
472
473 Ok(())
474 }
475
476 pub fn alloc_new_column(
481 &mut self,
482 table_name: &str,
483 new_column: &ColumnSchema,
484 ) -> Result<ColumnDescriptor> {
485 let desc = ColumnDescriptorBuilder::new(
486 self.next_column_id as ColumnId,
487 &new_column.name,
488 new_column.data_type.clone(),
489 )
490 .is_nullable(new_column.is_nullable())
491 .default_constraint(new_column.default_constraint().cloned())
492 .build()
493 .context(error::BuildColumnDescriptorSnafu {
494 table_name,
495 column_name: &new_column.name,
496 })?;
497
498 self.next_column_id += 1;
500
501 Ok(desc)
502 }
503
504 fn new_meta_builder(&self) -> TableMetaBuilder {
506 let mut builder = TableMetaBuilder::from(self);
507 builder.value_indices = None;
509 builder
510 }
511
512 fn add_columns(
514 &self,
515 table_name: &str,
516 requests: &[AddColumnRequest],
517 ) -> Result<TableMetaBuilder> {
518 let table_schema = &self.schema;
519 let mut meta_builder = self.new_meta_builder();
520 let original_primary_key_indices: HashSet<&usize> =
521 self.primary_key_indices.iter().collect();
522
523 let mut names = HashSet::with_capacity(requests.len());
524 let mut new_columns = Vec::with_capacity(requests.len());
525 for col_to_add in requests {
526 if let Some(column_schema) =
527 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
528 {
529 ensure!(
531 col_to_add.add_if_not_exists,
532 error::ColumnExistsSnafu {
533 table_name,
534 column_name: &col_to_add.column_schema.name
535 },
536 );
537
538 ensure!(
540 column_schema.data_type == col_to_add.column_schema.data_type,
541 error::InvalidAlterRequestSnafu {
542 table: table_name,
543 err: format!(
544 "column {} already exists with different type {:?}",
545 col_to_add.column_schema.name, column_schema.data_type,
546 ),
547 }
548 );
549 } else {
550 ensure!(
553 names.insert(&col_to_add.column_schema.name),
554 error::InvalidAlterRequestSnafu {
555 table: table_name,
556 err: format!(
557 "add column {} more than once",
558 col_to_add.column_schema.name
559 ),
560 }
561 );
562
563 ensure!(
564 col_to_add.column_schema.is_nullable()
565 || col_to_add.column_schema.default_constraint().is_some(),
566 error::InvalidAlterRequestSnafu {
567 table: table_name,
568 err: format!(
569 "no default value for column {}",
570 col_to_add.column_schema.name
571 ),
572 },
573 );
574
575 new_columns.push(col_to_add.clone());
576 }
577 }
578 let requests = &new_columns[..];
579
580 let SplitResult {
581 columns_at_first,
582 columns_at_after,
583 columns_at_last,
584 column_names,
585 } = self.split_requests_by_column_location(table_name, requests)?;
586 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
587 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
588 columns_at_first.iter().rev().for_each(|request| {
590 if request.is_key {
591 primary_key_indices.push(columns.len());
593 }
594 columns.push(request.column_schema.clone());
595 });
596 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
598 if original_primary_key_indices.contains(&index) {
599 primary_key_indices.push(columns.len());
600 }
601 columns.push(column_schema.clone());
602 if let Some(requests) = columns_at_after.get(&column_schema.name) {
603 requests.iter().rev().for_each(|request| {
604 if request.is_key {
605 primary_key_indices.push(columns.len());
607 }
608 columns.push(request.column_schema.clone());
609 });
610 }
611 }
612 columns_at_last.iter().for_each(|request| {
614 if request.is_key {
615 primary_key_indices.push(columns.len());
617 }
618 columns.push(request.column_schema.clone());
619 });
620
621 let mut builder = SchemaBuilder::try_from(columns)
622 .with_context(|_| error::SchemaBuildSnafu {
623 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
624 })?
625 .version(table_schema.version() + 1);
627 for (k, v) in table_schema.metadata().iter() {
628 builder = builder.add_metadata(k, v);
629 }
630 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
631 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
632 })?;
633
634 let partition_key_indices = self
635 .partition_key_indices
636 .iter()
637 .map(|idx| table_schema.column_name_by_index(*idx))
638 .map(|name| new_schema.column_index_by_name(name).unwrap())
640 .collect();
641
642 let _ = meta_builder
644 .schema(Arc::new(new_schema))
645 .primary_key_indices(primary_key_indices)
646 .partition_key_indices(partition_key_indices);
647
648 Ok(meta_builder)
649 }
650
651 fn remove_columns(
652 &self,
653 table_name: &str,
654 column_names: &[String],
655 ) -> Result<TableMetaBuilder> {
656 let table_schema = &self.schema;
657 let column_names: HashSet<_> = column_names.iter().collect();
658 let mut meta_builder = self.new_meta_builder();
659
660 let timestamp_index = table_schema.timestamp_index();
661 for column_name in &column_names {
663 if let Some(index) = table_schema.column_index_by_name(column_name) {
664 ensure!(
667 !self.primary_key_indices.contains(&index),
668 error::RemoveColumnInIndexSnafu {
669 column_name: *column_name,
670 table_name,
671 }
672 );
673
674 ensure!(
675 !self.partition_key_indices.contains(&index),
676 error::RemovePartitionColumnSnafu {
677 column_name: *column_name,
678 table_name,
679 }
680 );
681
682 if let Some(ts_index) = timestamp_index {
683 ensure!(
685 index != ts_index,
686 error::RemoveColumnInIndexSnafu {
687 column_name: table_schema.column_name_by_index(ts_index),
688 table_name,
689 }
690 );
691 }
692 } else {
693 return error::ColumnNotExistsSnafu {
694 column_name: *column_name,
695 table_name,
696 }
697 .fail()?;
698 }
699 }
700
701 let columns: Vec<_> = table_schema
703 .column_schemas()
704 .iter()
705 .filter(|column_schema| !column_names.contains(&column_schema.name))
706 .cloned()
707 .collect();
708
709 let mut builder = SchemaBuilder::try_from_columns(columns)
710 .with_context(|_| error::SchemaBuildSnafu {
711 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
712 })?
713 .version(table_schema.version() + 1);
715 for (k, v) in table_schema.metadata().iter() {
716 builder = builder.add_metadata(k, v);
717 }
718 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
719 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
720 })?;
721
722 let primary_key_indices = self
724 .primary_key_indices
725 .iter()
726 .map(|idx| table_schema.column_name_by_index(*idx))
727 .map(|name| new_schema.column_index_by_name(name).unwrap())
729 .collect();
730
731 let partition_key_indices = self
732 .partition_key_indices
733 .iter()
734 .map(|idx| table_schema.column_name_by_index(*idx))
735 .map(|name| new_schema.column_index_by_name(name).unwrap())
737 .collect();
738
739 let _ = meta_builder
740 .schema(Arc::new(new_schema))
741 .primary_key_indices(primary_key_indices)
742 .partition_key_indices(partition_key_indices);
743
744 Ok(meta_builder)
745 }
746
747 fn modify_column_types(
748 &self,
749 table_name: &str,
750 requests: &[ModifyColumnTypeRequest],
751 ) -> Result<TableMetaBuilder> {
752 let table_schema = &self.schema;
753 let mut meta_builder = self.new_meta_builder();
754
755 let mut modify_column_types = HashMap::with_capacity(requests.len());
756 let timestamp_index = table_schema.timestamp_index();
757
758 for col_to_change in requests {
759 let change_column_name = &col_to_change.column_name;
760
761 let index = table_schema
762 .column_index_by_name(change_column_name)
763 .with_context(|| error::ColumnNotExistsSnafu {
764 column_name: change_column_name,
765 table_name,
766 })?;
767
768 let column = &table_schema.column_schemas()[index];
769
770 ensure!(
771 !self.primary_key_indices.contains(&index),
772 error::InvalidAlterRequestSnafu {
773 table: table_name,
774 err: format!(
775 "Not allowed to change primary key index column '{}'",
776 column.name
777 )
778 }
779 );
780
781 if let Some(ts_index) = timestamp_index {
782 ensure!(
784 index != ts_index,
785 error::InvalidAlterRequestSnafu {
786 table: table_name,
787 err: format!(
788 "Not allowed to change timestamp index column '{}' datatype",
789 column.name
790 )
791 }
792 );
793 }
794
795 ensure!(
796 modify_column_types
797 .insert(&col_to_change.column_name, col_to_change)
798 .is_none(),
799 error::InvalidAlterRequestSnafu {
800 table: table_name,
801 err: format!(
802 "change column datatype {} more than once",
803 col_to_change.column_name
804 ),
805 }
806 );
807
808 ensure!(
809 column
810 .data_type
811 .can_arrow_type_cast_to(&col_to_change.target_type),
812 error::InvalidAlterRequestSnafu {
813 table: table_name,
814 err: format!(
815 "column '{}' cannot be cast automatically to type '{}'",
816 col_to_change.column_name, col_to_change.target_type,
817 ),
818 }
819 );
820
821 ensure!(
822 column.is_nullable(),
823 error::InvalidAlterRequestSnafu {
824 table: table_name,
825 err: format!(
826 "column '{}' must be nullable to ensure safe conversion.",
827 col_to_change.column_name,
828 ),
829 }
830 );
831 }
832 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
835 for mut column in table_schema.column_schemas().iter().cloned() {
836 if let Some(change_column) = modify_column_types.get(&column.name) {
837 column.data_type = change_column.target_type.clone();
838 let new_default = if let Some(default_value) = column.default_constraint() {
839 Some(
840 default_value
841 .cast_to_datatype(&change_column.target_type)
842 .with_context(|_| error::CastDefaultValueSnafu {
843 reason: format!(
844 "Failed to cast default value from {:?} to type {:?}",
845 default_value, &change_column.target_type
846 ),
847 })?,
848 )
849 } else {
850 None
851 };
852 column = column
853 .clone()
854 .with_default_constraint(new_default.clone())
855 .with_context(|_| error::CastDefaultValueSnafu {
856 reason: format!("Failed to set new default: {:?}", new_default),
857 })?;
858 }
859 columns.push(column)
860 }
861
862 let mut builder = SchemaBuilder::try_from_columns(columns)
863 .with_context(|_| error::SchemaBuildSnafu {
864 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
865 })?
866 .version(table_schema.version() + 1);
868 for (k, v) in table_schema.metadata().iter() {
869 builder = builder.add_metadata(k, v);
870 }
871 let new_schema = builder.build().with_context(|_| {
872 let column_names: Vec<_> = requests
873 .iter()
874 .map(|request| &request.column_name)
875 .collect();
876
877 error::SchemaBuildSnafu {
878 msg: format!(
879 "Table {table_name} cannot change datatype with columns {column_names:?}"
880 ),
881 }
882 })?;
883
884 let _ = meta_builder
885 .schema(Arc::new(new_schema))
886 .primary_key_indices(self.primary_key_indices.clone());
887
888 Ok(meta_builder)
889 }
890
891 fn split_requests_by_column_location<'a>(
893 &self,
894 table_name: &str,
895 requests: &'a [AddColumnRequest],
896 ) -> Result<SplitResult<'a>> {
897 let table_schema = &self.schema;
898 let mut columns_at_first = Vec::new();
899 let mut columns_at_after = HashMap::new();
900 let mut columns_at_last = Vec::new();
901 let mut column_names = Vec::with_capacity(requests.len());
902 for request in requests {
903 let column_name = &request.column_schema.name;
905 column_names.push(column_name.clone());
906 ensure!(
907 table_schema.column_schema_by_name(column_name).is_none(),
908 error::ColumnExistsSnafu {
909 column_name,
910 table_name,
911 }
912 );
913 match request.location.as_ref() {
914 Some(AddColumnLocation::First) => {
915 columns_at_first.push(request);
916 }
917 Some(AddColumnLocation::After { column_name }) => {
918 ensure!(
919 table_schema.column_schema_by_name(column_name).is_some(),
920 error::ColumnNotExistsSnafu {
921 column_name,
922 table_name,
923 }
924 );
925 columns_at_after
926 .entry(column_name.clone())
927 .or_insert(Vec::new())
928 .push(request);
929 }
930 None => {
931 columns_at_last.push(request);
932 }
933 }
934 }
935 Ok(SplitResult {
936 columns_at_first,
937 columns_at_after,
938 columns_at_last,
939 column_names,
940 })
941 }
942
943 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
944 let table_schema = &self.schema;
945 let mut meta_builder = self.new_meta_builder();
946 let mut columns = Vec::with_capacity(table_schema.num_columns());
947 for column_schema in table_schema.column_schemas() {
948 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
949 ensure!(
951 column_schema.default_constraint().is_some(),
952 error::InvalidAlterRequestSnafu {
953 table: table_name,
954 err: format!("column {name} does not have a default value"),
955 }
956 );
957 if !column_schema.is_nullable() {
958 return error::InvalidAlterRequestSnafu {
959 table: table_name,
960 err: format!(
961 "column {name} is not nullable and `default` cannot be dropped",
962 ),
963 }
964 .fail();
965 }
966 let new_column_schema = column_schema.clone();
967 let new_column_schema = new_column_schema
968 .with_default_constraint(None)
969 .with_context(|_| error::SchemaBuildSnafu {
970 msg: format!("Table {table_name} cannot drop default values"),
971 })?;
972 columns.push(new_column_schema);
973 } else {
974 columns.push(column_schema.clone());
975 }
976 }
977
978 let mut builder = SchemaBuilder::try_from_columns(columns)
979 .with_context(|_| error::SchemaBuildSnafu {
980 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
981 })?
982 .version(table_schema.version() + 1);
984 for (k, v) in table_schema.metadata().iter() {
985 builder = builder.add_metadata(k, v);
986 }
987 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
988 msg: format!("Table {table_name} cannot drop default values"),
989 })?;
990
991 let _ = meta_builder.schema(Arc::new(new_schema));
992
993 Ok(meta_builder)
994 }
995
996 fn set_defaults(
997 &self,
998 table_name: &str,
999 set_defaults: &[SetDefaultRequest],
1000 ) -> Result<TableMetaBuilder> {
1001 let table_schema = &self.schema;
1002 let mut meta_builder = self.new_meta_builder();
1003 let mut columns = Vec::with_capacity(table_schema.num_columns());
1004 for column_schema in table_schema.column_schemas() {
1005 if let Some(set_default) = set_defaults
1006 .iter()
1007 .find(|s| s.column_name == column_schema.name)
1008 {
1009 let new_column_schema = column_schema.clone();
1010 let new_column_schema = new_column_schema
1011 .with_default_constraint(set_default.default_constraint.clone())
1012 .with_context(|_| error::SchemaBuildSnafu {
1013 msg: format!("Table {table_name} cannot set default values"),
1014 })?;
1015 columns.push(new_column_schema);
1016 } else {
1017 columns.push(column_schema.clone());
1018 }
1019 }
1020
1021 let mut builder = SchemaBuilder::try_from_columns(columns)
1022 .with_context(|_| error::SchemaBuildSnafu {
1023 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1024 })?
1025 .version(table_schema.version() + 1);
1027 for (k, v) in table_schema.metadata().iter() {
1028 builder = builder.add_metadata(k, v);
1029 }
1030 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1031 msg: format!("Table {table_name} cannot set default values"),
1032 })?;
1033
1034 let _ = meta_builder.schema(Arc::new(new_schema));
1035
1036 Ok(meta_builder)
1037 }
1038}
1039
1040#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1041#[builder(pattern = "owned")]
1042pub struct TableInfo {
1043 #[builder(default, setter(into))]
1045 pub ident: TableIdent,
1046 #[builder(setter(into))]
1048 pub name: String,
1049 #[builder(default, setter(into))]
1051 pub desc: Option<String>,
1052 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1053 pub catalog_name: String,
1054 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1055 pub schema_name: String,
1056 pub meta: TableMeta,
1057 #[builder(default = "TableType::Base")]
1058 pub table_type: TableType,
1059}
1060
1061pub type TableInfoRef = Arc<TableInfo>;
1062
1063impl TableInfo {
1064 pub fn table_id(&self) -> TableId {
1065 self.ident.table_id
1066 }
1067
1068 pub fn region_ids(&self) -> Vec<RegionId> {
1069 self.meta
1070 .region_numbers
1071 .iter()
1072 .map(|id| RegionId::new(self.table_id(), *id))
1073 .collect()
1074 }
1075 pub fn full_table_name(&self) -> String {
1077 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1078 }
1079
1080 pub fn get_db_string(&self) -> String {
1081 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1082 }
1083
1084 pub fn is_physical_table(&self) -> bool {
1086 self.meta
1087 .options
1088 .extra_options
1089 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1090 }
1091
1092 pub fn is_ttl_instant_table(&self) -> bool {
1094 self.meta
1095 .options
1096 .ttl
1097 .map(|t| t.is_instant())
1098 .unwrap_or(false)
1099 }
1100}
1101
1102impl TableInfoBuilder {
1103 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1104 Self {
1105 name: Some(name.into()),
1106 meta: Some(meta),
1107 ..Default::default()
1108 }
1109 }
1110
1111 pub fn table_id(mut self, id: TableId) -> Self {
1112 let ident = self.ident.get_or_insert_with(TableIdent::default);
1113 ident.table_id = id;
1114 self
1115 }
1116
1117 pub fn table_version(mut self, version: TableVersion) -> Self {
1118 let ident = self.ident.get_or_insert_with(TableIdent::default);
1119 ident.version = version;
1120 self
1121 }
1122}
1123
1124impl TableIdent {
1125 pub fn new(table_id: TableId) -> Self {
1126 Self {
1127 table_id,
1128 version: 0,
1129 }
1130 }
1131}
1132
1133impl From<TableId> for TableIdent {
1134 fn from(table_id: TableId) -> Self {
1135 Self::new(table_id)
1136 }
1137}
1138
1139#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1141pub struct RawTableMeta {
1142 pub schema: RawSchema,
1143 pub primary_key_indices: Vec<usize>,
1146 pub value_indices: Vec<usize>,
1149 pub engine: String,
1151 pub next_column_id: ColumnId,
1154 pub region_numbers: Vec<u32>,
1155 pub options: TableOptions,
1156 pub created_on: DateTime<Utc>,
1157 #[serde(default)]
1159 pub partition_key_indices: Vec<usize>,
1160 #[serde(default)]
1163 pub column_ids: Vec<ColumnId>,
1164}
1165
1166impl From<TableMeta> for RawTableMeta {
1167 fn from(meta: TableMeta) -> RawTableMeta {
1168 RawTableMeta {
1169 schema: RawSchema::from(&*meta.schema),
1170 primary_key_indices: meta.primary_key_indices,
1171 value_indices: meta.value_indices,
1172 engine: meta.engine,
1173 next_column_id: meta.next_column_id,
1174 region_numbers: meta.region_numbers,
1175 options: meta.options,
1176 created_on: meta.created_on,
1177 partition_key_indices: meta.partition_key_indices,
1178 column_ids: meta.column_ids,
1179 }
1180 }
1181}
1182
1183impl TryFrom<RawTableMeta> for TableMeta {
1184 type Error = ConvertError;
1185
1186 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1187 Ok(TableMeta {
1188 schema: Arc::new(Schema::try_from(raw.schema)?),
1189 primary_key_indices: raw.primary_key_indices,
1190 value_indices: raw.value_indices,
1191 engine: raw.engine,
1192 region_numbers: raw.region_numbers,
1193 next_column_id: raw.next_column_id,
1194 options: raw.options,
1195 created_on: raw.created_on,
1196 partition_key_indices: raw.partition_key_indices,
1197 column_ids: raw.column_ids,
1198 })
1199 }
1200}
1201
1202#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1204pub struct RawTableInfo {
1205 pub ident: TableIdent,
1206 pub name: String,
1207 pub desc: Option<String>,
1208 pub catalog_name: String,
1209 pub schema_name: String,
1210 pub meta: RawTableMeta,
1211 pub table_type: TableType,
1212}
1213
1214impl RawTableInfo {
1215 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1219 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1220 None
1221 } else {
1222 Some(
1223 self.meta
1224 .column_ids
1225 .iter()
1226 .enumerate()
1227 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1228 .collect(),
1229 )
1230 }
1231 }
1232
1233 pub fn sort_columns(&mut self) {
1235 let column_schemas = &self.meta.schema.column_schemas;
1236 let primary_keys = self
1237 .meta
1238 .primary_key_indices
1239 .iter()
1240 .map(|index| column_schemas[*index].name.clone())
1241 .collect::<HashSet<_>>();
1242
1243 let name_to_ids = self.name_to_ids().unwrap_or_default();
1244 self.meta
1245 .schema
1246 .column_schemas
1247 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1248
1249 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1251 let mut timestamp_index = None;
1252 let mut value_indices =
1253 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1254 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1255 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1256 if primary_keys.contains(&column_schema.name) {
1257 primary_key_indices.push(index);
1258 } else if column_schema.is_time_index() {
1259 value_indices.push(index);
1260 timestamp_index = Some(index);
1261 } else {
1262 value_indices.push(index);
1263 }
1264 if let Some(id) = name_to_ids.get(&column_schema.name) {
1265 column_ids.push(*id);
1266 }
1267 }
1268
1269 self.meta.schema.timestamp_index = timestamp_index;
1271 self.meta.primary_key_indices = primary_key_indices;
1272 self.meta.value_indices = value_indices;
1273 self.meta.column_ids = column_ids;
1274 }
1275
1276 pub fn to_region_options(&self) -> HashMap<String, String> {
1280 HashMap::from(&self.meta.options)
1281 }
1282
1283 pub fn table_ref(&self) -> TableReference {
1285 TableReference::full(
1286 self.catalog_name.as_str(),
1287 self.schema_name.as_str(),
1288 self.name.as_str(),
1289 )
1290 }
1291}
1292
1293impl From<TableInfo> for RawTableInfo {
1294 fn from(info: TableInfo) -> RawTableInfo {
1295 RawTableInfo {
1296 ident: info.ident,
1297 name: info.name,
1298 desc: info.desc,
1299 catalog_name: info.catalog_name,
1300 schema_name: info.schema_name,
1301 meta: RawTableMeta::from(info.meta),
1302 table_type: info.table_type,
1303 }
1304 }
1305}
1306
1307impl TryFrom<RawTableInfo> for TableInfo {
1308 type Error = ConvertError;
1309
1310 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1311 Ok(TableInfo {
1312 ident: raw.ident,
1313 name: raw.name,
1314 desc: raw.desc,
1315 catalog_name: raw.catalog_name,
1316 schema_name: raw.schema_name,
1317 meta: TableMeta::try_from(raw.meta)?,
1318 table_type: raw.table_type,
1319 })
1320 }
1321}
1322
1323fn set_column_fulltext_options(
1332 column_schema: &mut ColumnSchema,
1333 column_name: &str,
1334 options: &FulltextOptions,
1335 current_options: Option<FulltextOptions>,
1336) -> Result<()> {
1337 if let Some(current_options) = current_options {
1338 ensure!(
1339 current_options.analyzer == options.analyzer
1340 && current_options.case_sensitive == options.case_sensitive,
1341 error::InvalidColumnOptionSnafu {
1342 column_name,
1343 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1344 current_options.analyzer, current_options.case_sensitive),
1345 }
1346 );
1347 }
1348
1349 column_schema
1350 .set_fulltext_options(options)
1351 .context(error::SetFulltextOptionsSnafu { column_name })?;
1352
1353 Ok(())
1354}
1355
1356fn unset_column_fulltext_options(
1357 column_schema: &mut ColumnSchema,
1358 column_name: &str,
1359 current_options: Option<FulltextOptions>,
1360) -> Result<()> {
1361 ensure!(
1362 current_options
1363 .as_ref()
1364 .is_some_and(|options| options.enable),
1365 error::InvalidColumnOptionSnafu {
1366 column_name,
1367 msg: "FULLTEXT index already disabled".to_string(),
1368 }
1369 );
1370
1371 let mut options = current_options.unwrap();
1372 options.enable = false;
1373 column_schema
1374 .set_fulltext_options(&options)
1375 .context(error::SetFulltextOptionsSnafu { column_name })?;
1376
1377 Ok(())
1378}
1379
1380fn set_column_skipping_index_options(
1381 column_schema: &mut ColumnSchema,
1382 column_name: &str,
1383 options: &SkippingIndexOptions,
1384) -> Result<()> {
1385 column_schema
1386 .set_skipping_options(options)
1387 .context(error::SetSkippingOptionsSnafu { column_name })?;
1388
1389 Ok(())
1390}
1391
1392fn unset_column_skipping_index_options(
1393 column_schema: &mut ColumnSchema,
1394 column_name: &str,
1395) -> Result<()> {
1396 column_schema
1397 .unset_skipping_options()
1398 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1399 Ok(())
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use std::assert_matches::assert_matches;
1405
1406 use common_error::ext::ErrorExt;
1407 use common_error::status_code::StatusCode;
1408 use datatypes::data_type::ConcreteDataType;
1409 use datatypes::schema::{
1410 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1411 };
1412
1413 use super::*;
1414 use crate::Error;
1415
1416 fn new_test_schema() -> Schema {
1418 let column_schemas = vec![
1419 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1420 ColumnSchema::new(
1421 "ts",
1422 ConcreteDataType::timestamp_millisecond_datatype(),
1423 false,
1424 )
1425 .with_time_index(true),
1426 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1427 ];
1428 SchemaBuilder::try_from(column_schemas)
1429 .unwrap()
1430 .version(123)
1431 .build()
1432 .unwrap()
1433 }
1434
1435 #[test]
1436 fn test_raw_convert() {
1437 let schema = Arc::new(new_test_schema());
1438 let meta = TableMetaBuilder::empty()
1439 .schema(schema)
1440 .primary_key_indices(vec![0])
1441 .engine("engine")
1442 .next_column_id(3)
1443 .build()
1444 .unwrap();
1445 let info = TableInfoBuilder::default()
1446 .table_id(10)
1447 .table_version(5)
1448 .name("mytable")
1449 .meta(meta)
1450 .build()
1451 .unwrap();
1452
1453 let raw = RawTableInfo::from(info.clone());
1454 let info_new = TableInfo::try_from(raw).unwrap();
1455
1456 assert_eq!(info, info_new);
1457 }
1458
1459 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1460 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1461 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1462 let alter_kind = AlterKind::AddColumns {
1463 columns: vec![
1464 AddColumnRequest {
1465 column_schema: new_tag,
1466 is_key: true,
1467 location: None,
1468 add_if_not_exists: false,
1469 },
1470 AddColumnRequest {
1471 column_schema: new_field,
1472 is_key: false,
1473 location: None,
1474 add_if_not_exists: false,
1475 },
1476 ],
1477 };
1478
1479 let builder = meta
1480 .builder_with_alter_kind("my_table", &alter_kind)
1481 .unwrap();
1482 builder.build().unwrap()
1483 }
1484
1485 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1486 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1487 let new_field = ColumnSchema::new(
1488 "my_field_after_ts",
1489 ConcreteDataType::string_datatype(),
1490 true,
1491 );
1492 let yet_another_field = ColumnSchema::new(
1493 "yet_another_field_after_ts",
1494 ConcreteDataType::int64_datatype(),
1495 true,
1496 );
1497 let alter_kind = AlterKind::AddColumns {
1498 columns: vec![
1499 AddColumnRequest {
1500 column_schema: new_tag,
1501 is_key: true,
1502 location: Some(AddColumnLocation::First),
1503 add_if_not_exists: false,
1504 },
1505 AddColumnRequest {
1506 column_schema: new_field,
1507 is_key: false,
1508 location: Some(AddColumnLocation::After {
1509 column_name: "ts".to_string(),
1510 }),
1511 add_if_not_exists: false,
1512 },
1513 AddColumnRequest {
1514 column_schema: yet_another_field,
1515 is_key: true,
1516 location: Some(AddColumnLocation::After {
1517 column_name: "ts".to_string(),
1518 }),
1519 add_if_not_exists: false,
1520 },
1521 ],
1522 };
1523
1524 let builder = meta
1525 .builder_with_alter_kind("my_table", &alter_kind)
1526 .unwrap();
1527 builder.build().unwrap()
1528 }
1529
1530 #[test]
1531 fn test_add_columns() {
1532 let schema = Arc::new(new_test_schema());
1533 let meta = TableMetaBuilder::empty()
1534 .schema(schema)
1535 .primary_key_indices(vec![0])
1536 .engine("engine")
1537 .next_column_id(3)
1538 .build()
1539 .unwrap();
1540
1541 let new_meta = add_columns_to_meta(&meta);
1542 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1543
1544 let names: Vec<String> = new_meta
1545 .schema
1546 .column_schemas()
1547 .iter()
1548 .map(|column_schema| column_schema.name.clone())
1549 .collect();
1550 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1551 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1552 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1553 }
1554
1555 #[test]
1556 fn test_add_columns_multiple_times() {
1557 let schema = Arc::new(new_test_schema());
1558 let meta = TableMetaBuilder::empty()
1559 .schema(schema)
1560 .primary_key_indices(vec![0])
1561 .engine("engine")
1562 .next_column_id(3)
1563 .build()
1564 .unwrap();
1565
1566 let alter_kind = AlterKind::AddColumns {
1567 columns: vec![
1568 AddColumnRequest {
1569 column_schema: ColumnSchema::new(
1570 "col3",
1571 ConcreteDataType::int32_datatype(),
1572 true,
1573 ),
1574 is_key: true,
1575 location: None,
1576 add_if_not_exists: true,
1577 },
1578 AddColumnRequest {
1579 column_schema: ColumnSchema::new(
1580 "col3",
1581 ConcreteDataType::int32_datatype(),
1582 true,
1583 ),
1584 is_key: true,
1585 location: None,
1586 add_if_not_exists: true,
1587 },
1588 ],
1589 };
1590 let err = meta
1591 .builder_with_alter_kind("my_table", &alter_kind)
1592 .err()
1593 .unwrap();
1594 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1595 }
1596
1597 #[test]
1598 fn test_remove_columns() {
1599 let schema = Arc::new(new_test_schema());
1600 let meta = TableMetaBuilder::empty()
1601 .schema(schema.clone())
1602 .primary_key_indices(vec![0])
1603 .engine("engine")
1604 .next_column_id(3)
1605 .build()
1606 .unwrap();
1607 let meta = add_columns_to_meta(&meta);
1609
1610 let alter_kind = AlterKind::DropColumns {
1611 names: vec![String::from("col2"), String::from("my_field")],
1612 };
1613 let new_meta = meta
1614 .builder_with_alter_kind("my_table", &alter_kind)
1615 .unwrap()
1616 .build()
1617 .unwrap();
1618
1619 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1620
1621 let names: Vec<String> = new_meta
1622 .schema
1623 .column_schemas()
1624 .iter()
1625 .map(|column_schema| column_schema.name.clone())
1626 .collect();
1627 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1628 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1629 assert_eq!(&[1], &new_meta.value_indices[..]);
1630 assert_eq!(
1631 schema.timestamp_column(),
1632 new_meta.schema.timestamp_column()
1633 );
1634 }
1635
1636 #[test]
1637 fn test_remove_multiple_columns_before_timestamp() {
1638 let column_schemas = vec![
1639 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1640 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1641 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1642 ColumnSchema::new(
1643 "ts",
1644 ConcreteDataType::timestamp_millisecond_datatype(),
1645 false,
1646 )
1647 .with_time_index(true),
1648 ];
1649 let schema = Arc::new(
1650 SchemaBuilder::try_from(column_schemas)
1651 .unwrap()
1652 .version(123)
1653 .build()
1654 .unwrap(),
1655 );
1656 let meta = TableMetaBuilder::empty()
1657 .schema(schema.clone())
1658 .primary_key_indices(vec![1])
1659 .engine("engine")
1660 .next_column_id(4)
1661 .build()
1662 .unwrap();
1663
1664 let alter_kind = AlterKind::DropColumns {
1666 names: vec![String::from("col3"), String::from("col1")],
1667 };
1668 let new_meta = meta
1669 .builder_with_alter_kind("my_table", &alter_kind)
1670 .unwrap()
1671 .build()
1672 .unwrap();
1673
1674 let names: Vec<String> = new_meta
1675 .schema
1676 .column_schemas()
1677 .iter()
1678 .map(|column_schema| column_schema.name.clone())
1679 .collect();
1680 assert_eq!(&["col2", "ts"], &names[..]);
1681 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1682 assert_eq!(&[1], &new_meta.value_indices[..]);
1683 assert_eq!(
1684 schema.timestamp_column(),
1685 new_meta.schema.timestamp_column()
1686 );
1687 }
1688
1689 #[test]
1690 fn test_add_existing_column() {
1691 let schema = Arc::new(new_test_schema());
1692 let meta = TableMetaBuilder::empty()
1693 .schema(schema)
1694 .primary_key_indices(vec![0])
1695 .engine("engine")
1696 .next_column_id(3)
1697 .build()
1698 .unwrap();
1699
1700 let alter_kind = AlterKind::AddColumns {
1701 columns: vec![AddColumnRequest {
1702 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1703 is_key: false,
1704 location: None,
1705 add_if_not_exists: false,
1706 }],
1707 };
1708
1709 let err = meta
1710 .builder_with_alter_kind("my_table", &alter_kind)
1711 .err()
1712 .unwrap();
1713 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1714
1715 let alter_kind = AlterKind::AddColumns {
1717 columns: vec![AddColumnRequest {
1718 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1719 is_key: true,
1720 location: None,
1721 add_if_not_exists: true,
1722 }],
1723 };
1724 let new_meta = meta
1725 .builder_with_alter_kind("my_table", &alter_kind)
1726 .unwrap()
1727 .build()
1728 .unwrap();
1729 assert_eq!(
1730 meta.schema.column_schemas(),
1731 new_meta.schema.column_schemas()
1732 );
1733 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1734 }
1735
1736 #[test]
1737 fn test_add_different_type_column() {
1738 let schema = Arc::new(new_test_schema());
1739 let meta = TableMetaBuilder::empty()
1740 .schema(schema)
1741 .primary_key_indices(vec![0])
1742 .engine("engine")
1743 .next_column_id(3)
1744 .build()
1745 .unwrap();
1746
1747 let alter_kind = AlterKind::AddColumns {
1749 columns: vec![AddColumnRequest {
1750 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1751 is_key: false,
1752 location: None,
1753 add_if_not_exists: true,
1754 }],
1755 };
1756 let err = meta
1757 .builder_with_alter_kind("my_table", &alter_kind)
1758 .err()
1759 .unwrap();
1760 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1761 }
1762
1763 #[test]
1764 fn test_add_invalid_column() {
1765 let schema = Arc::new(new_test_schema());
1766 let meta = TableMetaBuilder::empty()
1767 .schema(schema)
1768 .primary_key_indices(vec![0])
1769 .engine("engine")
1770 .next_column_id(3)
1771 .build()
1772 .unwrap();
1773
1774 let alter_kind = AlterKind::AddColumns {
1776 columns: vec![AddColumnRequest {
1777 column_schema: ColumnSchema::new(
1778 "weny",
1779 ConcreteDataType::string_datatype(),
1780 false,
1781 ),
1782 is_key: false,
1783 location: None,
1784 add_if_not_exists: false,
1785 }],
1786 };
1787
1788 let err = meta
1789 .builder_with_alter_kind("my_table", &alter_kind)
1790 .err()
1791 .unwrap();
1792 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1793 }
1794
1795 #[test]
1796 fn test_remove_unknown_column() {
1797 let schema = Arc::new(new_test_schema());
1798 let meta = TableMetaBuilder::empty()
1799 .schema(schema)
1800 .primary_key_indices(vec![0])
1801 .engine("engine")
1802 .next_column_id(3)
1803 .build()
1804 .unwrap();
1805
1806 let alter_kind = AlterKind::DropColumns {
1807 names: vec![String::from("unknown")],
1808 };
1809
1810 let err = meta
1811 .builder_with_alter_kind("my_table", &alter_kind)
1812 .err()
1813 .unwrap();
1814 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1815 }
1816
1817 #[test]
1818 fn test_change_unknown_column_data_type() {
1819 let schema = Arc::new(new_test_schema());
1820 let meta = TableMetaBuilder::empty()
1821 .schema(schema)
1822 .primary_key_indices(vec![0])
1823 .engine("engine")
1824 .next_column_id(3)
1825 .build()
1826 .unwrap();
1827
1828 let alter_kind = AlterKind::ModifyColumnTypes {
1829 columns: vec![ModifyColumnTypeRequest {
1830 column_name: "unknown".to_string(),
1831 target_type: ConcreteDataType::string_datatype(),
1832 }],
1833 };
1834
1835 let err = meta
1836 .builder_with_alter_kind("my_table", &alter_kind)
1837 .err()
1838 .unwrap();
1839 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1840 }
1841
1842 #[test]
1843 fn test_remove_key_column() {
1844 let schema = Arc::new(new_test_schema());
1845 let meta = TableMetaBuilder::empty()
1846 .schema(schema)
1847 .primary_key_indices(vec![0])
1848 .engine("engine")
1849 .next_column_id(3)
1850 .build()
1851 .unwrap();
1852
1853 let alter_kind = AlterKind::DropColumns {
1855 names: vec![String::from("col1")],
1856 };
1857
1858 let err = meta
1859 .builder_with_alter_kind("my_table", &alter_kind)
1860 .err()
1861 .unwrap();
1862 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1863
1864 let alter_kind = AlterKind::DropColumns {
1866 names: vec![String::from("ts")],
1867 };
1868
1869 let err = meta
1870 .builder_with_alter_kind("my_table", &alter_kind)
1871 .err()
1872 .unwrap();
1873 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1874 }
1875
1876 #[test]
1877 fn test_remove_partition_column() {
1878 let schema = Arc::new(new_test_schema());
1879 let meta = TableMetaBuilder::empty()
1880 .schema(schema)
1881 .primary_key_indices(vec![])
1882 .partition_key_indices(vec![0])
1883 .engine("engine")
1884 .next_column_id(3)
1885 .build()
1886 .unwrap();
1887 let alter_kind = AlterKind::DropColumns {
1889 names: vec![String::from("col1")],
1890 };
1891
1892 let err = meta
1893 .builder_with_alter_kind("my_table", &alter_kind)
1894 .err()
1895 .unwrap();
1896 assert_matches!(err, Error::RemovePartitionColumn { .. });
1897 }
1898
1899 #[test]
1900 fn test_change_key_column_data_type() {
1901 let schema = Arc::new(new_test_schema());
1902 let meta = TableMetaBuilder::empty()
1903 .schema(schema)
1904 .primary_key_indices(vec![0])
1905 .engine("engine")
1906 .next_column_id(3)
1907 .build()
1908 .unwrap();
1909
1910 let alter_kind = AlterKind::ModifyColumnTypes {
1912 columns: vec![ModifyColumnTypeRequest {
1913 column_name: "col1".to_string(),
1914 target_type: ConcreteDataType::string_datatype(),
1915 }],
1916 };
1917
1918 let err = meta
1919 .builder_with_alter_kind("my_table", &alter_kind)
1920 .err()
1921 .unwrap();
1922 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1923
1924 let alter_kind = AlterKind::ModifyColumnTypes {
1926 columns: vec![ModifyColumnTypeRequest {
1927 column_name: "ts".to_string(),
1928 target_type: ConcreteDataType::string_datatype(),
1929 }],
1930 };
1931
1932 let err = meta
1933 .builder_with_alter_kind("my_table", &alter_kind)
1934 .err()
1935 .unwrap();
1936 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1937 }
1938
1939 #[test]
1940 fn test_alloc_new_column() {
1941 let schema = Arc::new(new_test_schema());
1942 let mut meta = TableMetaBuilder::empty()
1943 .schema(schema)
1944 .primary_key_indices(vec![0])
1945 .engine("engine")
1946 .next_column_id(3)
1947 .build()
1948 .unwrap();
1949 assert_eq!(3, meta.next_column_id);
1950
1951 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1952 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1953
1954 assert_eq!(4, meta.next_column_id);
1955 assert_eq!(column_schema.name, desc.name);
1956 }
1957
1958 #[test]
1959 fn test_add_columns_with_location() {
1960 let schema = Arc::new(new_test_schema());
1961 let meta = TableMetaBuilder::empty()
1962 .schema(schema)
1963 .primary_key_indices(vec![0])
1964 .partition_key_indices(vec![0, 2])
1966 .engine("engine")
1967 .next_column_id(3)
1968 .build()
1969 .unwrap();
1970
1971 let new_meta = add_columns_to_meta_with_location(&meta);
1972 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1973
1974 let names: Vec<String> = new_meta
1975 .schema
1976 .column_schemas()
1977 .iter()
1978 .map(|column_schema| column_schema.name.clone())
1979 .collect();
1980 assert_eq!(
1981 &[
1982 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
1989 &names[..]
1990 );
1991 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
1992 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
1993 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
1994 }
1995
1996 #[test]
1997 fn test_modify_column_fulltext_options() {
1998 let schema = Arc::new(new_test_schema());
1999 let meta = TableMetaBuilder::empty()
2000 .schema(schema)
2001 .primary_key_indices(vec![0])
2002 .engine("engine")
2003 .next_column_id(3)
2004 .build()
2005 .unwrap();
2006
2007 let alter_kind = AlterKind::SetIndexes {
2008 options: vec![SetIndexOption::Fulltext {
2009 column_name: "col1".to_string(),
2010 options: FulltextOptions::default(),
2011 }],
2012 };
2013 let err = meta
2014 .builder_with_alter_kind("my_table", &alter_kind)
2015 .err()
2016 .unwrap();
2017 assert_eq!(
2018 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2019 err.to_string()
2020 );
2021
2022 let new_meta = add_columns_to_meta_with_location(&meta);
2024 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2025
2026 let alter_kind = AlterKind::SetIndexes {
2027 options: vec![SetIndexOption::Fulltext {
2028 column_name: "my_tag_first".to_string(),
2029 options: FulltextOptions::new_unchecked(
2030 true,
2031 FulltextAnalyzer::Chinese,
2032 true,
2033 FulltextBackend::Bloom,
2034 1000,
2035 0.01,
2036 ),
2037 }],
2038 };
2039 let new_meta = new_meta
2040 .builder_with_alter_kind("my_table", &alter_kind)
2041 .unwrap()
2042 .build()
2043 .unwrap();
2044 let column_schema = new_meta
2045 .schema
2046 .column_schema_by_name("my_tag_first")
2047 .unwrap();
2048 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2049 assert!(fulltext_options.enable);
2050 assert_eq!(
2051 datatypes::schema::FulltextAnalyzer::Chinese,
2052 fulltext_options.analyzer
2053 );
2054 assert!(fulltext_options.case_sensitive);
2055
2056 let alter_kind = AlterKind::UnsetIndexes {
2057 options: vec![UnsetIndexOption::Fulltext {
2058 column_name: "my_tag_first".to_string(),
2059 }],
2060 };
2061 let new_meta = new_meta
2062 .builder_with_alter_kind("my_table", &alter_kind)
2063 .unwrap()
2064 .build()
2065 .unwrap();
2066 let column_schema = new_meta
2067 .schema
2068 .column_schema_by_name("my_tag_first")
2069 .unwrap();
2070 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2071 assert!(!fulltext_options.enable);
2072 }
2073}