1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26 SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39 TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50 Unsupported,
52 Inexact,
57 Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64 fn from(value: TableProviderFilterPushDown) -> Self {
65 match value {
66 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69 }
70 }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74 fn from(value: FilterPushDownType) -> Self {
75 match value {
76 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79 }
80 }
81}
82
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86 Base,
88 View,
90 Temporary,
92}
93
94impl std::fmt::Display for TableType {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 TableType::Base => f.write_str("BASE TABLE"),
98 TableType::Temporary => f.write_str("TEMPORARY"),
99 TableType::View => f.write_str("VIEW"),
100 }
101 }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105 fn from(t: TableType) -> datafusion::datasource::TableType {
106 match t {
107 TableType::Base => datafusion::datasource::TableType::Base,
108 TableType::View => datafusion::datasource::TableType::View,
109 TableType::Temporary => datafusion::datasource::TableType::Temporary,
110 }
111 }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117 pub table_id: TableId,
119 pub version: TableVersion,
122}
123
124#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130 pub schema: SchemaRef,
131 pub primary_key_indices: Vec<usize>,
134 #[builder(default = "self.default_value_indices()?")]
135 pub value_indices: Vec<usize>,
136 #[builder(default, setter(into))]
137 pub engine: String,
138 #[builder(default, setter(into))]
139 pub region_numbers: Vec<u32>,
140 pub next_column_id: ColumnId,
141 #[builder(default)]
143 pub options: TableOptions,
144 #[builder(default = "Utc::now()")]
145 pub created_on: DateTime<Utc>,
146 #[builder(default = "Vec::new()")]
147 pub partition_key_indices: Vec<usize>,
148 #[builder(default = "Vec::new()")]
149 pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153 #[cfg(any(test, feature = "testing"))]
155 pub fn empty() -> Self {
156 Self {
157 schema: None,
158 primary_key_indices: None,
159 value_indices: None,
160 engine: None,
161 region_numbers: None,
162 next_column_id: None,
163 options: None,
164 created_on: None,
165 partition_key_indices: None,
166 column_ids: None,
167 }
168 }
169}
170
171impl TableMetaBuilder {
172 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173 match (&self.primary_key_indices, &self.schema) {
174 (Some(v), Some(schema)) => {
175 let column_schemas = schema.column_schemas();
176 Ok((0..column_schemas.len())
177 .filter(|idx| !v.contains(idx))
178 .collect())
179 }
180 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181 }
182 }
183
184 pub fn new_external_table() -> Self {
185 Self {
186 schema: None,
187 primary_key_indices: Some(Vec::new()),
188 value_indices: Some(Vec::new()),
189 engine: None,
190 region_numbers: Some(Vec::new()),
191 next_column_id: Some(0),
192 options: None,
193 created_on: None,
194 partition_key_indices: None,
195 column_ids: None,
196 }
197 }
198}
199
200struct SplitResult<'a> {
202 columns_at_first: Vec<&'a AddColumnRequest>,
204 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
206 columns_at_last: Vec<&'a AddColumnRequest>,
208 column_names: Vec<String>,
210}
211
212impl TableMeta {
213 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
214 let columns_schemas = &self.schema.column_schemas();
215 self.primary_key_indices
216 .iter()
217 .map(|idx| &columns_schemas[*idx].name)
218 }
219
220 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
221 let columns_schemas = self.schema.column_schemas();
223 let primary_key_indices = &self.primary_key_indices;
224 columns_schemas
225 .iter()
226 .enumerate()
227 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
228 .map(|(_, cs)| &cs.name)
229 }
230
231 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
232 let columns_schemas = &self.schema.column_schemas();
233 self.partition_key_indices
234 .iter()
235 .map(|idx| &columns_schemas[*idx].name)
236 }
237
238 pub fn builder_with_alter_kind(
242 &self,
243 table_name: &str,
244 alter_kind: &AlterKind,
245 ) -> Result<TableMetaBuilder> {
246 match alter_kind {
247 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
248 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
249 AlterKind::ModifyColumnTypes { columns } => {
250 self.modify_column_types(table_name, columns)
251 }
252 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
254 AlterKind::SetTableOptions { options } => self.set_table_options(options),
255 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
256 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
257 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
258 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
259 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
260 }
261 }
262
263 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
265 let mut new_options = self.options.clone();
266
267 for request in requests {
268 match request {
269 SetRegionOption::Ttl(new_ttl) => {
270 new_options.ttl = *new_ttl;
271 }
272 SetRegionOption::Twsc(key, value) => {
273 if !value.is_empty() {
274 new_options.extra_options.insert(key.clone(), value.clone());
275 new_options.extra_options.insert(
277 COMPACTION_TYPE.to_string(),
278 COMPACTION_TYPE_TWCS.to_string(),
279 );
280 } else {
281 new_options.extra_options.remove(key.as_str());
283 }
284 }
285 }
286 }
287 let mut builder = self.new_meta_builder();
288 builder.options(new_options);
289
290 Ok(builder)
291 }
292
293 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
294 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
295 self.set_table_options(&requests)
296 }
297
298 fn set_indexes(
299 &self,
300 table_name: &str,
301 requests: &[SetIndexOption],
302 ) -> Result<TableMetaBuilder> {
303 let table_schema = &self.schema;
304 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
305 for request in requests {
306 let column_name = request.column_name();
307 table_schema
308 .column_index_by_name(column_name)
309 .with_context(|| error::ColumnNotExistsSnafu {
310 column_name,
311 table_name,
312 })?;
313 set_index_options
314 .entry(column_name)
315 .or_default()
316 .push(request);
317 }
318
319 let mut meta_builder = self.new_meta_builder();
320 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
321 for mut column in table_schema.column_schemas().iter().cloned() {
322 if let Some(request) = set_index_options.get(column.name.as_str()) {
323 for request in request {
324 self.set_index(&mut column, request)?;
325 }
326 }
327 columns.push(column);
328 }
329
330 let mut builder = SchemaBuilder::try_from_columns(columns)
331 .with_context(|_| error::SchemaBuildSnafu {
332 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
333 })?
334 .version(table_schema.version() + 1);
335
336 for (k, v) in table_schema.metadata().iter() {
337 builder = builder.add_metadata(k, v);
338 }
339
340 let new_schema = builder.build().with_context(|_| {
341 let column_names = requests
342 .iter()
343 .map(|request| request.column_name())
344 .collect::<Vec<_>>();
345 error::SchemaBuildSnafu {
346 msg: format!(
347 "Table {table_name} cannot set index options with columns {column_names:?}",
348 ),
349 }
350 })?;
351 let _ = meta_builder
352 .schema(Arc::new(new_schema))
353 .primary_key_indices(self.primary_key_indices.clone());
354
355 Ok(meta_builder)
356 }
357
358 fn unset_indexes(
359 &self,
360 table_name: &str,
361 requests: &[UnsetIndexOption],
362 ) -> Result<TableMetaBuilder> {
363 let table_schema = &self.schema;
364 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
365 for request in requests {
366 let column_name = request.column_name();
367 table_schema
368 .column_index_by_name(column_name)
369 .with_context(|| error::ColumnNotExistsSnafu {
370 column_name,
371 table_name,
372 })?;
373 set_index_options
374 .entry(column_name)
375 .or_default()
376 .push(request);
377 }
378
379 let mut meta_builder = self.new_meta_builder();
380 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
381 for mut column in table_schema.column_schemas().iter().cloned() {
382 if let Some(request) = set_index_options.get(column.name.as_str()) {
383 for request in request {
384 self.unset_index(&mut column, request)?;
385 }
386 }
387 columns.push(column);
388 }
389
390 let mut builder = SchemaBuilder::try_from_columns(columns)
391 .with_context(|_| error::SchemaBuildSnafu {
392 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
393 })?
394 .version(table_schema.version() + 1);
395
396 for (k, v) in table_schema.metadata().iter() {
397 builder = builder.add_metadata(k, v);
398 }
399
400 let new_schema = builder.build().with_context(|_| {
401 let column_names = requests
402 .iter()
403 .map(|request| request.column_name())
404 .collect::<Vec<_>>();
405 error::SchemaBuildSnafu {
406 msg: format!(
407 "Table {table_name} cannot set index options with columns {column_names:?}",
408 ),
409 }
410 })?;
411 let _ = meta_builder
412 .schema(Arc::new(new_schema))
413 .primary_key_indices(self.primary_key_indices.clone());
414
415 Ok(meta_builder)
416 }
417
418 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
419 match request {
420 SetIndexOption::Fulltext {
421 column_name,
422 options,
423 } => {
424 ensure!(
425 column_schema.data_type.is_string(),
426 error::InvalidColumnOptionSnafu {
427 column_name,
428 msg: "FULLTEXT index only supports string type",
429 }
430 );
431
432 let current_fulltext_options = column_schema
433 .fulltext_options()
434 .context(error::SetFulltextOptionsSnafu { column_name })?;
435 set_column_fulltext_options(
436 column_schema,
437 column_name,
438 options,
439 current_fulltext_options,
440 )?;
441 }
442 SetIndexOption::Inverted { column_name } => {
443 debug_assert_eq!(column_schema.name, *column_name);
444 column_schema.set_inverted_index(true);
445 }
446 SetIndexOption::Skipping {
447 column_name,
448 options,
449 } => {
450 set_column_skipping_index_options(column_schema, column_name, options)?;
451 }
452 }
453
454 Ok(())
455 }
456
457 fn unset_index(
458 &self,
459 column_schema: &mut ColumnSchema,
460 request: &UnsetIndexOption,
461 ) -> Result<()> {
462 match request {
463 UnsetIndexOption::Fulltext { column_name } => {
464 let current_fulltext_options = column_schema
465 .fulltext_options()
466 .context(error::SetFulltextOptionsSnafu { column_name })?;
467 unset_column_fulltext_options(
468 column_schema,
469 column_name,
470 current_fulltext_options.clone(),
471 )?
472 }
473 UnsetIndexOption::Inverted { .. } => {
474 column_schema.set_inverted_index(false);
475 }
476 UnsetIndexOption::Skipping { column_name } => {
477 unset_column_skipping_index_options(column_schema, column_name)?;
478 }
479 }
480
481 Ok(())
482 }
483
484 pub fn alloc_new_column(
489 &mut self,
490 table_name: &str,
491 new_column: &ColumnSchema,
492 ) -> Result<ColumnDescriptor> {
493 let desc = ColumnDescriptorBuilder::new(
494 self.next_column_id as ColumnId,
495 &new_column.name,
496 new_column.data_type.clone(),
497 )
498 .is_nullable(new_column.is_nullable())
499 .default_constraint(new_column.default_constraint().cloned())
500 .build()
501 .context(error::BuildColumnDescriptorSnafu {
502 table_name,
503 column_name: &new_column.name,
504 })?;
505
506 self.next_column_id += 1;
508
509 Ok(desc)
510 }
511
512 fn new_meta_builder(&self) -> TableMetaBuilder {
514 let mut builder = TableMetaBuilder::from(self);
515 builder.value_indices = None;
517 builder
518 }
519
520 fn add_columns(
522 &self,
523 table_name: &str,
524 requests: &[AddColumnRequest],
525 ) -> Result<TableMetaBuilder> {
526 let table_schema = &self.schema;
527 let mut meta_builder = self.new_meta_builder();
528 let original_primary_key_indices: HashSet<&usize> =
529 self.primary_key_indices.iter().collect();
530
531 let mut names = HashSet::with_capacity(requests.len());
532 let mut new_columns = Vec::with_capacity(requests.len());
533 for col_to_add in requests {
534 if let Some(column_schema) =
535 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
536 {
537 ensure!(
539 col_to_add.add_if_not_exists,
540 error::ColumnExistsSnafu {
541 table_name,
542 column_name: &col_to_add.column_schema.name
543 },
544 );
545
546 ensure!(
548 column_schema.data_type == col_to_add.column_schema.data_type,
549 error::InvalidAlterRequestSnafu {
550 table: table_name,
551 err: format!(
552 "column {} already exists with different type {:?}",
553 col_to_add.column_schema.name, column_schema.data_type,
554 ),
555 }
556 );
557 } else {
558 ensure!(
561 names.insert(&col_to_add.column_schema.name),
562 error::InvalidAlterRequestSnafu {
563 table: table_name,
564 err: format!(
565 "add column {} more than once",
566 col_to_add.column_schema.name
567 ),
568 }
569 );
570
571 ensure!(
572 col_to_add.column_schema.is_nullable()
573 || col_to_add.column_schema.default_constraint().is_some(),
574 error::InvalidAlterRequestSnafu {
575 table: table_name,
576 err: format!(
577 "no default value for column {}",
578 col_to_add.column_schema.name
579 ),
580 },
581 );
582
583 new_columns.push(col_to_add.clone());
584 }
585 }
586 let requests = &new_columns[..];
587
588 let SplitResult {
589 columns_at_first,
590 columns_at_after,
591 columns_at_last,
592 column_names,
593 } = self.split_requests_by_column_location(table_name, requests)?;
594 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
595 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
596 columns_at_first.iter().rev().for_each(|request| {
598 if request.is_key {
599 primary_key_indices.push(columns.len());
601 }
602 columns.push(request.column_schema.clone());
603 });
604 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
606 if original_primary_key_indices.contains(&index) {
607 primary_key_indices.push(columns.len());
608 }
609 columns.push(column_schema.clone());
610 if let Some(requests) = columns_at_after.get(&column_schema.name) {
611 requests.iter().rev().for_each(|request| {
612 if request.is_key {
613 primary_key_indices.push(columns.len());
615 }
616 columns.push(request.column_schema.clone());
617 });
618 }
619 }
620 columns_at_last.iter().for_each(|request| {
622 if request.is_key {
623 primary_key_indices.push(columns.len());
625 }
626 columns.push(request.column_schema.clone());
627 });
628
629 let mut builder = SchemaBuilder::try_from(columns)
630 .with_context(|_| error::SchemaBuildSnafu {
631 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
632 })?
633 .version(table_schema.version() + 1);
635 for (k, v) in table_schema.metadata().iter() {
636 builder = builder.add_metadata(k, v);
637 }
638 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
639 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
640 })?;
641
642 let partition_key_indices = self
643 .partition_key_indices
644 .iter()
645 .map(|idx| table_schema.column_name_by_index(*idx))
646 .map(|name| new_schema.column_index_by_name(name).unwrap())
648 .collect();
649
650 let _ = meta_builder
652 .schema(Arc::new(new_schema))
653 .primary_key_indices(primary_key_indices)
654 .partition_key_indices(partition_key_indices);
655
656 Ok(meta_builder)
657 }
658
659 fn remove_columns(
660 &self,
661 table_name: &str,
662 column_names: &[String],
663 ) -> Result<TableMetaBuilder> {
664 let table_schema = &self.schema;
665 let column_names: HashSet<_> = column_names.iter().collect();
666 let mut meta_builder = self.new_meta_builder();
667
668 let timestamp_index = table_schema.timestamp_index();
669 for column_name in &column_names {
671 if let Some(index) = table_schema.column_index_by_name(column_name) {
672 ensure!(
675 !self.primary_key_indices.contains(&index),
676 error::RemoveColumnInIndexSnafu {
677 column_name: *column_name,
678 table_name,
679 }
680 );
681
682 ensure!(
683 !self.partition_key_indices.contains(&index),
684 error::RemovePartitionColumnSnafu {
685 column_name: *column_name,
686 table_name,
687 }
688 );
689
690 if let Some(ts_index) = timestamp_index {
691 ensure!(
693 index != ts_index,
694 error::RemoveColumnInIndexSnafu {
695 column_name: table_schema.column_name_by_index(ts_index),
696 table_name,
697 }
698 );
699 }
700 } else {
701 return error::ColumnNotExistsSnafu {
702 column_name: *column_name,
703 table_name,
704 }
705 .fail()?;
706 }
707 }
708
709 let columns: Vec<_> = table_schema
711 .column_schemas()
712 .iter()
713 .filter(|column_schema| !column_names.contains(&column_schema.name))
714 .cloned()
715 .collect();
716
717 let mut builder = SchemaBuilder::try_from_columns(columns)
718 .with_context(|_| error::SchemaBuildSnafu {
719 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
720 })?
721 .version(table_schema.version() + 1);
723 for (k, v) in table_schema.metadata().iter() {
724 builder = builder.add_metadata(k, v);
725 }
726 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
727 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
728 })?;
729
730 let primary_key_indices = self
732 .primary_key_indices
733 .iter()
734 .map(|idx| table_schema.column_name_by_index(*idx))
735 .map(|name| new_schema.column_index_by_name(name).unwrap())
737 .collect();
738
739 let partition_key_indices = self
740 .partition_key_indices
741 .iter()
742 .map(|idx| table_schema.column_name_by_index(*idx))
743 .map(|name| new_schema.column_index_by_name(name).unwrap())
745 .collect();
746
747 let _ = meta_builder
748 .schema(Arc::new(new_schema))
749 .primary_key_indices(primary_key_indices)
750 .partition_key_indices(partition_key_indices);
751
752 Ok(meta_builder)
753 }
754
755 fn modify_column_types(
756 &self,
757 table_name: &str,
758 requests: &[ModifyColumnTypeRequest],
759 ) -> Result<TableMetaBuilder> {
760 let table_schema = &self.schema;
761 let mut meta_builder = self.new_meta_builder();
762
763 let mut modify_column_types = HashMap::with_capacity(requests.len());
764 let timestamp_index = table_schema.timestamp_index();
765
766 for col_to_change in requests {
767 let change_column_name = &col_to_change.column_name;
768
769 let index = table_schema
770 .column_index_by_name(change_column_name)
771 .with_context(|| error::ColumnNotExistsSnafu {
772 column_name: change_column_name,
773 table_name,
774 })?;
775
776 let column = &table_schema.column_schemas()[index];
777
778 ensure!(
779 !self.primary_key_indices.contains(&index),
780 error::InvalidAlterRequestSnafu {
781 table: table_name,
782 err: format!(
783 "Not allowed to change primary key index column '{}'",
784 column.name
785 )
786 }
787 );
788
789 if let Some(ts_index) = timestamp_index {
790 ensure!(
792 index != ts_index,
793 error::InvalidAlterRequestSnafu {
794 table: table_name,
795 err: format!(
796 "Not allowed to change timestamp index column '{}' datatype",
797 column.name
798 )
799 }
800 );
801 }
802
803 ensure!(
804 modify_column_types
805 .insert(&col_to_change.column_name, col_to_change)
806 .is_none(),
807 error::InvalidAlterRequestSnafu {
808 table: table_name,
809 err: format!(
810 "change column datatype {} more than once",
811 col_to_change.column_name
812 ),
813 }
814 );
815
816 ensure!(
817 column
818 .data_type
819 .can_arrow_type_cast_to(&col_to_change.target_type),
820 error::InvalidAlterRequestSnafu {
821 table: table_name,
822 err: format!(
823 "column '{}' cannot be cast automatically to type '{}'",
824 col_to_change.column_name, col_to_change.target_type,
825 ),
826 }
827 );
828
829 ensure!(
830 column.is_nullable(),
831 error::InvalidAlterRequestSnafu {
832 table: table_name,
833 err: format!(
834 "column '{}' must be nullable to ensure safe conversion.",
835 col_to_change.column_name,
836 ),
837 }
838 );
839 }
840 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
843 for mut column in table_schema.column_schemas().iter().cloned() {
844 if let Some(change_column) = modify_column_types.get(&column.name) {
845 column.data_type = change_column.target_type.clone();
846 let new_default = if let Some(default_value) = column.default_constraint() {
847 Some(
848 default_value
849 .cast_to_datatype(&change_column.target_type)
850 .with_context(|_| error::CastDefaultValueSnafu {
851 reason: format!(
852 "Failed to cast default value from {:?} to type {:?}",
853 default_value, &change_column.target_type
854 ),
855 })?,
856 )
857 } else {
858 None
859 };
860 column = column
861 .clone()
862 .with_default_constraint(new_default.clone())
863 .with_context(|_| error::CastDefaultValueSnafu {
864 reason: format!("Failed to set new default: {:?}", new_default),
865 })?;
866 }
867 columns.push(column)
868 }
869
870 let mut builder = SchemaBuilder::try_from_columns(columns)
871 .with_context(|_| error::SchemaBuildSnafu {
872 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
873 })?
874 .version(table_schema.version() + 1);
876 for (k, v) in table_schema.metadata().iter() {
877 builder = builder.add_metadata(k, v);
878 }
879 let new_schema = builder.build().with_context(|_| {
880 let column_names: Vec<_> = requests
881 .iter()
882 .map(|request| &request.column_name)
883 .collect();
884
885 error::SchemaBuildSnafu {
886 msg: format!(
887 "Table {table_name} cannot change datatype with columns {column_names:?}"
888 ),
889 }
890 })?;
891
892 let _ = meta_builder
893 .schema(Arc::new(new_schema))
894 .primary_key_indices(self.primary_key_indices.clone());
895
896 Ok(meta_builder)
897 }
898
899 fn split_requests_by_column_location<'a>(
901 &self,
902 table_name: &str,
903 requests: &'a [AddColumnRequest],
904 ) -> Result<SplitResult<'a>> {
905 let table_schema = &self.schema;
906 let mut columns_at_first = Vec::new();
907 let mut columns_at_after = HashMap::new();
908 let mut columns_at_last = Vec::new();
909 let mut column_names = Vec::with_capacity(requests.len());
910 for request in requests {
911 let column_name = &request.column_schema.name;
913 column_names.push(column_name.clone());
914 ensure!(
915 table_schema.column_schema_by_name(column_name).is_none(),
916 error::ColumnExistsSnafu {
917 column_name,
918 table_name,
919 }
920 );
921 match request.location.as_ref() {
922 Some(AddColumnLocation::First) => {
923 columns_at_first.push(request);
924 }
925 Some(AddColumnLocation::After { column_name }) => {
926 ensure!(
927 table_schema.column_schema_by_name(column_name).is_some(),
928 error::ColumnNotExistsSnafu {
929 column_name,
930 table_name,
931 }
932 );
933 columns_at_after
934 .entry(column_name.clone())
935 .or_insert(Vec::new())
936 .push(request);
937 }
938 None => {
939 columns_at_last.push(request);
940 }
941 }
942 }
943 Ok(SplitResult {
944 columns_at_first,
945 columns_at_after,
946 columns_at_last,
947 column_names,
948 })
949 }
950
951 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
952 let table_schema = &self.schema;
953 let mut meta_builder = self.new_meta_builder();
954 let mut columns = Vec::with_capacity(table_schema.num_columns());
955 for column_schema in table_schema.column_schemas() {
956 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
957 ensure!(
959 column_schema.default_constraint().is_some(),
960 error::InvalidAlterRequestSnafu {
961 table: table_name,
962 err: format!("column {name} does not have a default value"),
963 }
964 );
965 if !column_schema.is_nullable() {
966 return error::InvalidAlterRequestSnafu {
967 table: table_name,
968 err: format!(
969 "column {name} is not nullable and `default` cannot be dropped",
970 ),
971 }
972 .fail();
973 }
974 let new_column_schema = column_schema.clone();
975 let new_column_schema = new_column_schema
976 .with_default_constraint(None)
977 .with_context(|_| error::SchemaBuildSnafu {
978 msg: format!("Table {table_name} cannot drop default values"),
979 })?;
980 columns.push(new_column_schema);
981 } else {
982 columns.push(column_schema.clone());
983 }
984 }
985
986 let mut builder = SchemaBuilder::try_from_columns(columns)
987 .with_context(|_| error::SchemaBuildSnafu {
988 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
989 })?
990 .version(table_schema.version() + 1);
992 for (k, v) in table_schema.metadata().iter() {
993 builder = builder.add_metadata(k, v);
994 }
995 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
996 msg: format!("Table {table_name} cannot drop default values"),
997 })?;
998
999 let _ = meta_builder.schema(Arc::new(new_schema));
1000
1001 Ok(meta_builder)
1002 }
1003
1004 fn set_defaults(
1005 &self,
1006 table_name: &str,
1007 set_defaults: &[SetDefaultRequest],
1008 ) -> Result<TableMetaBuilder> {
1009 let table_schema = &self.schema;
1010 let mut meta_builder = self.new_meta_builder();
1011 let mut columns = Vec::with_capacity(table_schema.num_columns());
1012 for column_schema in table_schema.column_schemas() {
1013 if let Some(set_default) = set_defaults
1014 .iter()
1015 .find(|s| s.column_name == column_schema.name)
1016 {
1017 let new_column_schema = column_schema.clone();
1018 let new_column_schema = new_column_schema
1019 .with_default_constraint(set_default.default_constraint.clone())
1020 .with_context(|_| error::SchemaBuildSnafu {
1021 msg: format!("Table {table_name} cannot set default values"),
1022 })?;
1023 columns.push(new_column_schema);
1024 } else {
1025 columns.push(column_schema.clone());
1026 }
1027 }
1028
1029 let mut builder = SchemaBuilder::try_from_columns(columns)
1030 .with_context(|_| error::SchemaBuildSnafu {
1031 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1032 })?
1033 .version(table_schema.version() + 1);
1035 for (k, v) in table_schema.metadata().iter() {
1036 builder = builder.add_metadata(k, v);
1037 }
1038 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1039 msg: format!("Table {table_name} cannot set default values"),
1040 })?;
1041
1042 let _ = meta_builder.schema(Arc::new(new_schema));
1043
1044 Ok(meta_builder)
1045 }
1046}
1047
1048#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1049#[builder(pattern = "owned")]
1050pub struct TableInfo {
1051 #[builder(default, setter(into))]
1053 pub ident: TableIdent,
1054 #[builder(setter(into))]
1056 pub name: String,
1057 #[builder(default, setter(into))]
1059 pub desc: Option<String>,
1060 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1061 pub catalog_name: String,
1062 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1063 pub schema_name: String,
1064 pub meta: TableMeta,
1065 #[builder(default = "TableType::Base")]
1066 pub table_type: TableType,
1067}
1068
1069pub type TableInfoRef = Arc<TableInfo>;
1070
1071impl TableInfo {
1072 pub fn table_id(&self) -> TableId {
1073 self.ident.table_id
1074 }
1075
1076 pub fn region_ids(&self) -> Vec<RegionId> {
1077 self.meta
1078 .region_numbers
1079 .iter()
1080 .map(|id| RegionId::new(self.table_id(), *id))
1081 .collect()
1082 }
1083 pub fn full_table_name(&self) -> String {
1085 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1086 }
1087
1088 pub fn get_db_string(&self) -> String {
1089 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1090 }
1091
1092 pub fn is_physical_table(&self) -> bool {
1094 self.meta
1095 .options
1096 .extra_options
1097 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1098 }
1099
1100 pub fn is_ttl_instant_table(&self) -> bool {
1102 self.meta
1103 .options
1104 .ttl
1105 .map(|t| t.is_instant())
1106 .unwrap_or(false)
1107 }
1108}
1109
1110impl TableInfoBuilder {
1111 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1112 Self {
1113 name: Some(name.into()),
1114 meta: Some(meta),
1115 ..Default::default()
1116 }
1117 }
1118
1119 pub fn table_id(mut self, id: TableId) -> Self {
1120 let ident = self.ident.get_or_insert_with(TableIdent::default);
1121 ident.table_id = id;
1122 self
1123 }
1124
1125 pub fn table_version(mut self, version: TableVersion) -> Self {
1126 let ident = self.ident.get_or_insert_with(TableIdent::default);
1127 ident.version = version;
1128 self
1129 }
1130}
1131
1132impl TableIdent {
1133 pub fn new(table_id: TableId) -> Self {
1134 Self {
1135 table_id,
1136 version: 0,
1137 }
1138 }
1139}
1140
1141impl From<TableId> for TableIdent {
1142 fn from(table_id: TableId) -> Self {
1143 Self::new(table_id)
1144 }
1145}
1146
1147#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1149pub struct RawTableMeta {
1150 pub schema: RawSchema,
1151 pub primary_key_indices: Vec<usize>,
1154 pub value_indices: Vec<usize>,
1157 pub engine: String,
1159 pub next_column_id: ColumnId,
1162 pub region_numbers: Vec<u32>,
1163 pub options: TableOptions,
1164 pub created_on: DateTime<Utc>,
1165 #[serde(default)]
1167 pub partition_key_indices: Vec<usize>,
1168 #[serde(default)]
1171 pub column_ids: Vec<ColumnId>,
1172}
1173
1174impl From<TableMeta> for RawTableMeta {
1175 fn from(meta: TableMeta) -> RawTableMeta {
1176 RawTableMeta {
1177 schema: RawSchema::from(&*meta.schema),
1178 primary_key_indices: meta.primary_key_indices,
1179 value_indices: meta.value_indices,
1180 engine: meta.engine,
1181 next_column_id: meta.next_column_id,
1182 region_numbers: meta.region_numbers,
1183 options: meta.options,
1184 created_on: meta.created_on,
1185 partition_key_indices: meta.partition_key_indices,
1186 column_ids: meta.column_ids,
1187 }
1188 }
1189}
1190
1191impl TryFrom<RawTableMeta> for TableMeta {
1192 type Error = ConvertError;
1193
1194 fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1195 Ok(TableMeta {
1196 schema: Arc::new(Schema::try_from(raw.schema)?),
1197 primary_key_indices: raw.primary_key_indices,
1198 value_indices: raw.value_indices,
1199 engine: raw.engine,
1200 region_numbers: raw.region_numbers,
1201 next_column_id: raw.next_column_id,
1202 options: raw.options,
1203 created_on: raw.created_on,
1204 partition_key_indices: raw.partition_key_indices,
1205 column_ids: raw.column_ids,
1206 })
1207 }
1208}
1209
1210#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1212pub struct RawTableInfo {
1213 pub ident: TableIdent,
1214 pub name: String,
1215 pub desc: Option<String>,
1216 pub catalog_name: String,
1217 pub schema_name: String,
1218 pub meta: RawTableMeta,
1219 pub table_type: TableType,
1220}
1221
1222impl RawTableInfo {
1223 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1227 if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1228 None
1229 } else {
1230 Some(
1231 self.meta
1232 .column_ids
1233 .iter()
1234 .enumerate()
1235 .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1236 .collect(),
1237 )
1238 }
1239 }
1240
1241 pub fn sort_columns(&mut self) {
1243 let column_schemas = &self.meta.schema.column_schemas;
1244 let primary_keys = self
1245 .meta
1246 .primary_key_indices
1247 .iter()
1248 .map(|index| column_schemas[*index].name.clone())
1249 .collect::<HashSet<_>>();
1250
1251 let name_to_ids = self.name_to_ids().unwrap_or_default();
1252 self.meta
1253 .schema
1254 .column_schemas
1255 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1256
1257 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1259 let mut timestamp_index = None;
1260 let mut value_indices =
1261 Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1262 let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1263 for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1264 if primary_keys.contains(&column_schema.name) {
1265 primary_key_indices.push(index);
1266 } else if column_schema.is_time_index() {
1267 value_indices.push(index);
1268 timestamp_index = Some(index);
1269 } else {
1270 value_indices.push(index);
1271 }
1272 if let Some(id) = name_to_ids.get(&column_schema.name) {
1273 column_ids.push(*id);
1274 }
1275 }
1276
1277 self.meta.schema.timestamp_index = timestamp_index;
1279 self.meta.primary_key_indices = primary_key_indices;
1280 self.meta.value_indices = value_indices;
1281 self.meta.column_ids = column_ids;
1282 }
1283
1284 pub fn to_region_options(&self) -> HashMap<String, String> {
1288 HashMap::from(&self.meta.options)
1289 }
1290
1291 pub fn table_ref(&self) -> TableReference<'_> {
1293 TableReference::full(
1294 self.catalog_name.as_str(),
1295 self.schema_name.as_str(),
1296 self.name.as_str(),
1297 )
1298 }
1299}
1300
1301impl From<TableInfo> for RawTableInfo {
1302 fn from(info: TableInfo) -> RawTableInfo {
1303 RawTableInfo {
1304 ident: info.ident,
1305 name: info.name,
1306 desc: info.desc,
1307 catalog_name: info.catalog_name,
1308 schema_name: info.schema_name,
1309 meta: RawTableMeta::from(info.meta),
1310 table_type: info.table_type,
1311 }
1312 }
1313}
1314
1315impl TryFrom<RawTableInfo> for TableInfo {
1316 type Error = ConvertError;
1317
1318 fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1319 Ok(TableInfo {
1320 ident: raw.ident,
1321 name: raw.name,
1322 desc: raw.desc,
1323 catalog_name: raw.catalog_name,
1324 schema_name: raw.schema_name,
1325 meta: TableMeta::try_from(raw.meta)?,
1326 table_type: raw.table_type,
1327 })
1328 }
1329}
1330
1331fn set_column_fulltext_options(
1340 column_schema: &mut ColumnSchema,
1341 column_name: &str,
1342 options: &FulltextOptions,
1343 current_options: Option<FulltextOptions>,
1344) -> Result<()> {
1345 if let Some(current_options) = current_options {
1346 ensure!(
1347 current_options.analyzer == options.analyzer
1348 && current_options.case_sensitive == options.case_sensitive,
1349 error::InvalidColumnOptionSnafu {
1350 column_name,
1351 msg: format!(
1352 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1353 current_options.analyzer, current_options.case_sensitive
1354 ),
1355 }
1356 );
1357 }
1358
1359 column_schema
1360 .set_fulltext_options(options)
1361 .context(error::SetFulltextOptionsSnafu { column_name })?;
1362
1363 Ok(())
1364}
1365
1366fn unset_column_fulltext_options(
1367 column_schema: &mut ColumnSchema,
1368 column_name: &str,
1369 current_options: Option<FulltextOptions>,
1370) -> Result<()> {
1371 ensure!(
1372 current_options
1373 .as_ref()
1374 .is_some_and(|options| options.enable),
1375 error::InvalidColumnOptionSnafu {
1376 column_name,
1377 msg: "FULLTEXT index already disabled".to_string(),
1378 }
1379 );
1380
1381 let mut options = current_options.unwrap();
1382 options.enable = false;
1383 column_schema
1384 .set_fulltext_options(&options)
1385 .context(error::SetFulltextOptionsSnafu { column_name })?;
1386
1387 Ok(())
1388}
1389
1390fn set_column_skipping_index_options(
1391 column_schema: &mut ColumnSchema,
1392 column_name: &str,
1393 options: &SkippingIndexOptions,
1394) -> Result<()> {
1395 column_schema
1396 .set_skipping_options(options)
1397 .context(error::SetSkippingOptionsSnafu { column_name })?;
1398
1399 Ok(())
1400}
1401
1402fn unset_column_skipping_index_options(
1403 column_schema: &mut ColumnSchema,
1404 column_name: &str,
1405) -> Result<()> {
1406 column_schema
1407 .unset_skipping_options()
1408 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1409 Ok(())
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414 use std::assert_matches::assert_matches;
1415
1416 use common_error::ext::ErrorExt;
1417 use common_error::status_code::StatusCode;
1418 use datatypes::data_type::ConcreteDataType;
1419 use datatypes::schema::{
1420 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1421 };
1422
1423 use super::*;
1424 use crate::Error;
1425
1426 fn new_test_schema() -> Schema {
1428 let column_schemas = vec![
1429 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1430 ColumnSchema::new(
1431 "ts",
1432 ConcreteDataType::timestamp_millisecond_datatype(),
1433 false,
1434 )
1435 .with_time_index(true),
1436 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1437 ];
1438 SchemaBuilder::try_from(column_schemas)
1439 .unwrap()
1440 .version(123)
1441 .build()
1442 .unwrap()
1443 }
1444
1445 #[test]
1446 fn test_raw_convert() {
1447 let schema = Arc::new(new_test_schema());
1448 let meta = TableMetaBuilder::empty()
1449 .schema(schema)
1450 .primary_key_indices(vec![0])
1451 .engine("engine")
1452 .next_column_id(3)
1453 .build()
1454 .unwrap();
1455 let info = TableInfoBuilder::default()
1456 .table_id(10)
1457 .table_version(5)
1458 .name("mytable")
1459 .meta(meta)
1460 .build()
1461 .unwrap();
1462
1463 let raw = RawTableInfo::from(info.clone());
1464 let info_new = TableInfo::try_from(raw).unwrap();
1465
1466 assert_eq!(info, info_new);
1467 }
1468
1469 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1470 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1471 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1472 let alter_kind = AlterKind::AddColumns {
1473 columns: vec![
1474 AddColumnRequest {
1475 column_schema: new_tag,
1476 is_key: true,
1477 location: None,
1478 add_if_not_exists: false,
1479 },
1480 AddColumnRequest {
1481 column_schema: new_field,
1482 is_key: false,
1483 location: None,
1484 add_if_not_exists: false,
1485 },
1486 ],
1487 };
1488
1489 let builder = meta
1490 .builder_with_alter_kind("my_table", &alter_kind)
1491 .unwrap();
1492 builder.build().unwrap()
1493 }
1494
1495 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1496 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1497 let new_field = ColumnSchema::new(
1498 "my_field_after_ts",
1499 ConcreteDataType::string_datatype(),
1500 true,
1501 );
1502 let yet_another_field = ColumnSchema::new(
1503 "yet_another_field_after_ts",
1504 ConcreteDataType::int64_datatype(),
1505 true,
1506 );
1507 let alter_kind = AlterKind::AddColumns {
1508 columns: vec![
1509 AddColumnRequest {
1510 column_schema: new_tag,
1511 is_key: true,
1512 location: Some(AddColumnLocation::First),
1513 add_if_not_exists: false,
1514 },
1515 AddColumnRequest {
1516 column_schema: new_field,
1517 is_key: false,
1518 location: Some(AddColumnLocation::After {
1519 column_name: "ts".to_string(),
1520 }),
1521 add_if_not_exists: false,
1522 },
1523 AddColumnRequest {
1524 column_schema: yet_another_field,
1525 is_key: true,
1526 location: Some(AddColumnLocation::After {
1527 column_name: "ts".to_string(),
1528 }),
1529 add_if_not_exists: false,
1530 },
1531 ],
1532 };
1533
1534 let builder = meta
1535 .builder_with_alter_kind("my_table", &alter_kind)
1536 .unwrap();
1537 builder.build().unwrap()
1538 }
1539
1540 #[test]
1541 fn test_add_columns() {
1542 let schema = Arc::new(new_test_schema());
1543 let meta = TableMetaBuilder::empty()
1544 .schema(schema)
1545 .primary_key_indices(vec![0])
1546 .engine("engine")
1547 .next_column_id(3)
1548 .build()
1549 .unwrap();
1550
1551 let new_meta = add_columns_to_meta(&meta);
1552 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1553
1554 let names: Vec<String> = new_meta
1555 .schema
1556 .column_schemas()
1557 .iter()
1558 .map(|column_schema| column_schema.name.clone())
1559 .collect();
1560 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1561 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1562 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1563 }
1564
1565 #[test]
1566 fn test_add_columns_multiple_times() {
1567 let schema = Arc::new(new_test_schema());
1568 let meta = TableMetaBuilder::empty()
1569 .schema(schema)
1570 .primary_key_indices(vec![0])
1571 .engine("engine")
1572 .next_column_id(3)
1573 .build()
1574 .unwrap();
1575
1576 let alter_kind = AlterKind::AddColumns {
1577 columns: vec![
1578 AddColumnRequest {
1579 column_schema: ColumnSchema::new(
1580 "col3",
1581 ConcreteDataType::int32_datatype(),
1582 true,
1583 ),
1584 is_key: true,
1585 location: None,
1586 add_if_not_exists: true,
1587 },
1588 AddColumnRequest {
1589 column_schema: ColumnSchema::new(
1590 "col3",
1591 ConcreteDataType::int32_datatype(),
1592 true,
1593 ),
1594 is_key: true,
1595 location: None,
1596 add_if_not_exists: true,
1597 },
1598 ],
1599 };
1600 let err = meta
1601 .builder_with_alter_kind("my_table", &alter_kind)
1602 .err()
1603 .unwrap();
1604 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1605 }
1606
1607 #[test]
1608 fn test_remove_columns() {
1609 let schema = Arc::new(new_test_schema());
1610 let meta = TableMetaBuilder::empty()
1611 .schema(schema.clone())
1612 .primary_key_indices(vec![0])
1613 .engine("engine")
1614 .next_column_id(3)
1615 .build()
1616 .unwrap();
1617 let meta = add_columns_to_meta(&meta);
1619
1620 let alter_kind = AlterKind::DropColumns {
1621 names: vec![String::from("col2"), String::from("my_field")],
1622 };
1623 let new_meta = meta
1624 .builder_with_alter_kind("my_table", &alter_kind)
1625 .unwrap()
1626 .build()
1627 .unwrap();
1628
1629 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1630
1631 let names: Vec<String> = new_meta
1632 .schema
1633 .column_schemas()
1634 .iter()
1635 .map(|column_schema| column_schema.name.clone())
1636 .collect();
1637 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1638 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1639 assert_eq!(&[1], &new_meta.value_indices[..]);
1640 assert_eq!(
1641 schema.timestamp_column(),
1642 new_meta.schema.timestamp_column()
1643 );
1644 }
1645
1646 #[test]
1647 fn test_remove_multiple_columns_before_timestamp() {
1648 let column_schemas = vec![
1649 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1650 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1651 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1652 ColumnSchema::new(
1653 "ts",
1654 ConcreteDataType::timestamp_millisecond_datatype(),
1655 false,
1656 )
1657 .with_time_index(true),
1658 ];
1659 let schema = Arc::new(
1660 SchemaBuilder::try_from(column_schemas)
1661 .unwrap()
1662 .version(123)
1663 .build()
1664 .unwrap(),
1665 );
1666 let meta = TableMetaBuilder::empty()
1667 .schema(schema.clone())
1668 .primary_key_indices(vec![1])
1669 .engine("engine")
1670 .next_column_id(4)
1671 .build()
1672 .unwrap();
1673
1674 let alter_kind = AlterKind::DropColumns {
1676 names: vec![String::from("col3"), String::from("col1")],
1677 };
1678 let new_meta = meta
1679 .builder_with_alter_kind("my_table", &alter_kind)
1680 .unwrap()
1681 .build()
1682 .unwrap();
1683
1684 let names: Vec<String> = new_meta
1685 .schema
1686 .column_schemas()
1687 .iter()
1688 .map(|column_schema| column_schema.name.clone())
1689 .collect();
1690 assert_eq!(&["col2", "ts"], &names[..]);
1691 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1692 assert_eq!(&[1], &new_meta.value_indices[..]);
1693 assert_eq!(
1694 schema.timestamp_column(),
1695 new_meta.schema.timestamp_column()
1696 );
1697 }
1698
1699 #[test]
1700 fn test_add_existing_column() {
1701 let schema = Arc::new(new_test_schema());
1702 let meta = TableMetaBuilder::empty()
1703 .schema(schema)
1704 .primary_key_indices(vec![0])
1705 .engine("engine")
1706 .next_column_id(3)
1707 .build()
1708 .unwrap();
1709
1710 let alter_kind = AlterKind::AddColumns {
1711 columns: vec![AddColumnRequest {
1712 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1713 is_key: false,
1714 location: None,
1715 add_if_not_exists: false,
1716 }],
1717 };
1718
1719 let err = meta
1720 .builder_with_alter_kind("my_table", &alter_kind)
1721 .err()
1722 .unwrap();
1723 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1724
1725 let alter_kind = AlterKind::AddColumns {
1727 columns: vec![AddColumnRequest {
1728 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1729 is_key: true,
1730 location: None,
1731 add_if_not_exists: true,
1732 }],
1733 };
1734 let new_meta = meta
1735 .builder_with_alter_kind("my_table", &alter_kind)
1736 .unwrap()
1737 .build()
1738 .unwrap();
1739 assert_eq!(
1740 meta.schema.column_schemas(),
1741 new_meta.schema.column_schemas()
1742 );
1743 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1744 }
1745
1746 #[test]
1747 fn test_add_different_type_column() {
1748 let schema = Arc::new(new_test_schema());
1749 let meta = TableMetaBuilder::empty()
1750 .schema(schema)
1751 .primary_key_indices(vec![0])
1752 .engine("engine")
1753 .next_column_id(3)
1754 .build()
1755 .unwrap();
1756
1757 let alter_kind = AlterKind::AddColumns {
1759 columns: vec![AddColumnRequest {
1760 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1761 is_key: false,
1762 location: None,
1763 add_if_not_exists: true,
1764 }],
1765 };
1766 let err = meta
1767 .builder_with_alter_kind("my_table", &alter_kind)
1768 .err()
1769 .unwrap();
1770 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1771 }
1772
1773 #[test]
1774 fn test_add_invalid_column() {
1775 let schema = Arc::new(new_test_schema());
1776 let meta = TableMetaBuilder::empty()
1777 .schema(schema)
1778 .primary_key_indices(vec![0])
1779 .engine("engine")
1780 .next_column_id(3)
1781 .build()
1782 .unwrap();
1783
1784 let alter_kind = AlterKind::AddColumns {
1786 columns: vec![AddColumnRequest {
1787 column_schema: ColumnSchema::new(
1788 "weny",
1789 ConcreteDataType::string_datatype(),
1790 false,
1791 ),
1792 is_key: false,
1793 location: None,
1794 add_if_not_exists: false,
1795 }],
1796 };
1797
1798 let err = meta
1799 .builder_with_alter_kind("my_table", &alter_kind)
1800 .err()
1801 .unwrap();
1802 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1803 }
1804
1805 #[test]
1806 fn test_remove_unknown_column() {
1807 let schema = Arc::new(new_test_schema());
1808 let meta = TableMetaBuilder::empty()
1809 .schema(schema)
1810 .primary_key_indices(vec![0])
1811 .engine("engine")
1812 .next_column_id(3)
1813 .build()
1814 .unwrap();
1815
1816 let alter_kind = AlterKind::DropColumns {
1817 names: vec![String::from("unknown")],
1818 };
1819
1820 let err = meta
1821 .builder_with_alter_kind("my_table", &alter_kind)
1822 .err()
1823 .unwrap();
1824 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1825 }
1826
1827 #[test]
1828 fn test_change_unknown_column_data_type() {
1829 let schema = Arc::new(new_test_schema());
1830 let meta = TableMetaBuilder::empty()
1831 .schema(schema)
1832 .primary_key_indices(vec![0])
1833 .engine("engine")
1834 .next_column_id(3)
1835 .build()
1836 .unwrap();
1837
1838 let alter_kind = AlterKind::ModifyColumnTypes {
1839 columns: vec![ModifyColumnTypeRequest {
1840 column_name: "unknown".to_string(),
1841 target_type: ConcreteDataType::string_datatype(),
1842 }],
1843 };
1844
1845 let err = meta
1846 .builder_with_alter_kind("my_table", &alter_kind)
1847 .err()
1848 .unwrap();
1849 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1850 }
1851
1852 #[test]
1853 fn test_remove_key_column() {
1854 let schema = Arc::new(new_test_schema());
1855 let meta = TableMetaBuilder::empty()
1856 .schema(schema)
1857 .primary_key_indices(vec![0])
1858 .engine("engine")
1859 .next_column_id(3)
1860 .build()
1861 .unwrap();
1862
1863 let alter_kind = AlterKind::DropColumns {
1865 names: vec![String::from("col1")],
1866 };
1867
1868 let err = meta
1869 .builder_with_alter_kind("my_table", &alter_kind)
1870 .err()
1871 .unwrap();
1872 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1873
1874 let alter_kind = AlterKind::DropColumns {
1876 names: vec![String::from("ts")],
1877 };
1878
1879 let err = meta
1880 .builder_with_alter_kind("my_table", &alter_kind)
1881 .err()
1882 .unwrap();
1883 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1884 }
1885
1886 #[test]
1887 fn test_remove_partition_column() {
1888 let schema = Arc::new(new_test_schema());
1889 let meta = TableMetaBuilder::empty()
1890 .schema(schema)
1891 .primary_key_indices(vec![])
1892 .partition_key_indices(vec![0])
1893 .engine("engine")
1894 .next_column_id(3)
1895 .build()
1896 .unwrap();
1897 let alter_kind = AlterKind::DropColumns {
1899 names: vec![String::from("col1")],
1900 };
1901
1902 let err = meta
1903 .builder_with_alter_kind("my_table", &alter_kind)
1904 .err()
1905 .unwrap();
1906 assert_matches!(err, Error::RemovePartitionColumn { .. });
1907 }
1908
1909 #[test]
1910 fn test_change_key_column_data_type() {
1911 let schema = Arc::new(new_test_schema());
1912 let meta = TableMetaBuilder::empty()
1913 .schema(schema)
1914 .primary_key_indices(vec![0])
1915 .engine("engine")
1916 .next_column_id(3)
1917 .build()
1918 .unwrap();
1919
1920 let alter_kind = AlterKind::ModifyColumnTypes {
1922 columns: vec![ModifyColumnTypeRequest {
1923 column_name: "col1".to_string(),
1924 target_type: ConcreteDataType::string_datatype(),
1925 }],
1926 };
1927
1928 let err = meta
1929 .builder_with_alter_kind("my_table", &alter_kind)
1930 .err()
1931 .unwrap();
1932 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1933
1934 let alter_kind = AlterKind::ModifyColumnTypes {
1936 columns: vec![ModifyColumnTypeRequest {
1937 column_name: "ts".to_string(),
1938 target_type: ConcreteDataType::string_datatype(),
1939 }],
1940 };
1941
1942 let err = meta
1943 .builder_with_alter_kind("my_table", &alter_kind)
1944 .err()
1945 .unwrap();
1946 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1947 }
1948
1949 #[test]
1950 fn test_alloc_new_column() {
1951 let schema = Arc::new(new_test_schema());
1952 let mut meta = TableMetaBuilder::empty()
1953 .schema(schema)
1954 .primary_key_indices(vec![0])
1955 .engine("engine")
1956 .next_column_id(3)
1957 .build()
1958 .unwrap();
1959 assert_eq!(3, meta.next_column_id);
1960
1961 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1962 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1963
1964 assert_eq!(4, meta.next_column_id);
1965 assert_eq!(column_schema.name, desc.name);
1966 }
1967
1968 #[test]
1969 fn test_add_columns_with_location() {
1970 let schema = Arc::new(new_test_schema());
1971 let meta = TableMetaBuilder::empty()
1972 .schema(schema)
1973 .primary_key_indices(vec![0])
1974 .partition_key_indices(vec![0, 2])
1976 .engine("engine")
1977 .next_column_id(3)
1978 .build()
1979 .unwrap();
1980
1981 let new_meta = add_columns_to_meta_with_location(&meta);
1982 assert_eq!(meta.region_numbers, new_meta.region_numbers);
1983
1984 let names: Vec<String> = new_meta
1985 .schema
1986 .column_schemas()
1987 .iter()
1988 .map(|column_schema| column_schema.name.clone())
1989 .collect();
1990 assert_eq!(
1991 &[
1992 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
1999 &names[..]
2000 );
2001 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2002 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2003 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2004 }
2005
2006 #[test]
2007 fn test_modify_column_fulltext_options() {
2008 let schema = Arc::new(new_test_schema());
2009 let meta = TableMetaBuilder::empty()
2010 .schema(schema)
2011 .primary_key_indices(vec![0])
2012 .engine("engine")
2013 .next_column_id(3)
2014 .build()
2015 .unwrap();
2016
2017 let alter_kind = AlterKind::SetIndexes {
2018 options: vec![SetIndexOption::Fulltext {
2019 column_name: "col1".to_string(),
2020 options: FulltextOptions::default(),
2021 }],
2022 };
2023 let err = meta
2024 .builder_with_alter_kind("my_table", &alter_kind)
2025 .err()
2026 .unwrap();
2027 assert_eq!(
2028 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2029 err.to_string()
2030 );
2031
2032 let new_meta = add_columns_to_meta_with_location(&meta);
2034 assert_eq!(meta.region_numbers, new_meta.region_numbers);
2035
2036 let alter_kind = AlterKind::SetIndexes {
2037 options: vec![SetIndexOption::Fulltext {
2038 column_name: "my_tag_first".to_string(),
2039 options: FulltextOptions::new_unchecked(
2040 true,
2041 FulltextAnalyzer::Chinese,
2042 true,
2043 FulltextBackend::Bloom,
2044 1000,
2045 0.01,
2046 ),
2047 }],
2048 };
2049 let new_meta = new_meta
2050 .builder_with_alter_kind("my_table", &alter_kind)
2051 .unwrap()
2052 .build()
2053 .unwrap();
2054 let column_schema = new_meta
2055 .schema
2056 .column_schema_by_name("my_tag_first")
2057 .unwrap();
2058 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2059 assert!(fulltext_options.enable);
2060 assert_eq!(
2061 datatypes::schema::FulltextAnalyzer::Chinese,
2062 fulltext_options.analyzer
2063 );
2064 assert!(fulltext_options.case_sensitive);
2065
2066 let alter_kind = AlterKind::UnsetIndexes {
2067 options: vec![UnsetIndexOption::Fulltext {
2068 column_name: "my_tag_first".to_string(),
2069 }],
2070 };
2071 let new_meta = new_meta
2072 .builder_with_alter_kind("my_table", &alter_kind)
2073 .unwrap()
2074 .build()
2075 .unwrap();
2076 let column_schema = new_meta
2077 .schema
2078 .column_schema_by_name("my_tag_first")
2079 .unwrap();
2080 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2081 assert!(!fulltext_options.enable);
2082 }
2083}