1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25 ColumnSchema, FulltextOptions, Schema, SchemaBuilder, SchemaRef, SkippingIndexOptions,
26};
27use derive_builder::Builder;
28use serde::{Deserialize, Deserializer, Serialize};
29use snafu::{OptionExt, ResultExt, ensure};
30use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
31use store_api::mito_engine_options::{
32 APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, SST_FORMAT_KEY,
33};
34use store_api::region_request::{SetRegionOption, UnsetRegionOption};
35use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
36
37use crate::error::{self, Result};
38use crate::requests::{
39 AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
40 TableOptions, UnsetIndexOption,
41};
42use crate::table_reference::TableReference;
43
44pub type TableId = u32;
45pub type TableVersion = u64;
46
47#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
50pub enum FilterPushDownType {
51 Unsupported,
53 Inexact,
58 Exact,
62}
63
64impl From<TableProviderFilterPushDown> for FilterPushDownType {
65 fn from(value: TableProviderFilterPushDown) -> Self {
66 match value {
67 TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
68 TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
69 TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
70 }
71 }
72}
73
74impl From<FilterPushDownType> for TableProviderFilterPushDown {
75 fn from(value: FilterPushDownType) -> Self {
76 match value {
77 FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
78 FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
79 FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
80 }
81 }
82}
83
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
86pub enum TableType {
87 Base,
89 View,
91 Temporary,
93}
94
95impl std::fmt::Display for TableType {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 TableType::Base => f.write_str("BASE TABLE"),
99 TableType::Temporary => f.write_str("TEMPORARY"),
100 TableType::View => f.write_str("VIEW"),
101 }
102 }
103}
104
105impl From<TableType> for datafusion::datasource::TableType {
106 fn from(t: TableType) -> datafusion::datasource::TableType {
107 match t {
108 TableType::Base => datafusion::datasource::TableType::Base,
109 TableType::View => datafusion::datasource::TableType::View,
110 TableType::Temporary => datafusion::datasource::TableType::Temporary,
111 }
112 }
113}
114
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
117pub struct TableIdent {
118 pub table_id: TableId,
120 pub version: TableVersion,
123}
124
125#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder, Serialize)]
129#[builder(pattern = "mutable", custom_constructor)]
130pub struct TableMeta {
131 pub schema: SchemaRef,
132 pub primary_key_indices: Vec<usize>,
135 #[builder(default = "self.default_value_indices()?")]
136 pub value_indices: Vec<usize>,
137 #[builder(default, setter(into))]
138 pub engine: String,
139 pub next_column_id: ColumnId,
140 #[builder(default)]
142 pub options: TableOptions,
143 #[builder(default = "Utc::now()")]
144 pub created_on: DateTime<Utc>,
145 #[builder(default = "self.default_updated_on()")]
146 pub updated_on: DateTime<Utc>,
147 #[builder(default = "Vec::new()")]
148 pub partition_key_indices: Vec<usize>,
149 #[builder(default = "Vec::new()")]
150 pub column_ids: Vec<ColumnId>,
151}
152
153impl TableMeta {
154 pub fn empty() -> Self {
155 Self {
156 schema: Arc::new(Schema::new(vec![])),
157 primary_key_indices: vec![],
158 value_indices: vec![],
159 engine: "".to_string(),
160 next_column_id: 0,
161 options: TableOptions::default(),
162 created_on: Utc::now(),
163 updated_on: Utc::now(),
164 partition_key_indices: vec![],
165 column_ids: vec![],
166 }
167 }
168}
169
170impl<'de> Deserialize<'de> for TableMeta {
171 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
172 where
173 D: Deserializer<'de>,
174 {
175 #[derive(Deserialize)]
176 struct RawTableMeta {
177 schema: SchemaRef,
178 primary_key_indices: Vec<usize>,
179 value_indices: Vec<usize>,
180 engine: String,
181 next_column_id: ColumnId,
182 options: TableOptions,
183 created_on: DateTime<Utc>,
184 updated_on: Option<DateTime<Utc>>,
185 #[serde(default)]
186 partition_key_indices: Vec<usize>,
187 #[serde(default)]
188 column_ids: Vec<ColumnId>,
189 }
190
191 let RawTableMeta {
192 schema,
193 primary_key_indices,
194 value_indices,
195 engine,
196 next_column_id,
197 options,
198 created_on,
199 updated_on,
200 partition_key_indices,
201 column_ids,
202 } = RawTableMeta::deserialize(deserializer)?;
203
204 Ok(Self {
205 schema,
206 primary_key_indices,
207 value_indices,
208 engine,
209 next_column_id,
210 options,
211 created_on,
212 updated_on: updated_on.unwrap_or(created_on),
213 partition_key_indices,
214 column_ids,
215 })
216 }
217}
218
219impl TableMetaBuilder {
220 #[cfg(any(test, feature = "testing"))]
222 pub fn empty() -> Self {
223 Self {
224 schema: None,
225 primary_key_indices: None,
226 value_indices: None,
227 engine: None,
228 next_column_id: None,
229 options: None,
230 created_on: None,
231 updated_on: None,
232 partition_key_indices: None,
233 column_ids: None,
234 }
235 }
236}
237
238impl TableMetaBuilder {
239 fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
240 match (&self.primary_key_indices, &self.schema) {
241 (Some(v), Some(schema)) => {
242 let column_schemas = schema.column_schemas();
243 Ok((0..column_schemas.len())
244 .filter(|idx| !v.contains(idx))
245 .collect())
246 }
247 _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
248 }
249 }
250
251 fn default_updated_on(&self) -> DateTime<Utc> {
252 self.created_on.unwrap_or_default()
253 }
254
255 pub fn new_external_table() -> Self {
256 Self {
257 schema: None,
258 primary_key_indices: Some(Vec::new()),
259 value_indices: Some(Vec::new()),
260 engine: None,
261 next_column_id: Some(0),
262 options: None,
263 created_on: None,
264 updated_on: None,
265 partition_key_indices: None,
266 column_ids: None,
267 }
268 }
269}
270
271struct SplitResult<'a> {
273 columns_at_first: Vec<&'a AddColumnRequest>,
275 columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
277 columns_at_last: Vec<&'a AddColumnRequest>,
279 column_names: Vec<String>,
281}
282
283impl TableMeta {
284 pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
285 let columns_schemas = &self.schema.column_schemas();
286 self.primary_key_indices
287 .iter()
288 .map(|idx| &columns_schemas[*idx].name)
289 }
290
291 pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
292 let columns_schemas = self.schema.column_schemas();
294 let primary_key_indices = &self.primary_key_indices;
295 columns_schemas
296 .iter()
297 .enumerate()
298 .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
299 .map(|(_, cs)| &cs.name)
300 }
301
302 pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
303 let columns_schemas = &self.schema.column_schemas();
304 self.partition_key_indices
305 .iter()
306 .map(|idx| &columns_schemas[*idx].name)
307 }
308
309 pub fn partition_columns(&self) -> impl Iterator<Item = &ColumnSchema> {
310 self.partition_key_indices
311 .iter()
312 .map(|idx| &self.schema.column_schemas()[*idx])
313 }
314
315 pub fn builder_with_alter_kind(
319 &self,
320 table_name: &str,
321 alter_kind: &AlterKind,
322 ) -> Result<TableMetaBuilder> {
323 let mut builder = match alter_kind {
324 AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
325 AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
326 AlterKind::ModifyColumnTypes { columns } => {
327 self.modify_column_types(table_name, columns)
328 }
329 AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
331 AlterKind::SetTableOptions { options } => self.set_table_options(options),
332 AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
333 AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
334 AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
335 AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
336 AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
337 }?;
338 let _ = builder.updated_on(Utc::now());
339 Ok(builder)
340 }
341
342 fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
344 let mut new_options = self.options.clone();
345
346 for request in requests {
347 match request {
348 SetRegionOption::Ttl(new_ttl) => {
349 new_options.ttl = *new_ttl;
350 }
351 SetRegionOption::Twsc(key, value) => {
352 if !value.is_empty() {
353 new_options.extra_options.insert(key.clone(), value.clone());
354 new_options.extra_options.insert(
356 COMPACTION_TYPE.to_string(),
357 COMPACTION_TYPE_TWCS.to_string(),
358 );
359 } else {
360 new_options.extra_options.remove(key.as_str());
362 }
363 }
364 SetRegionOption::Format(value) => {
365 new_options
366 .extra_options
367 .insert(SST_FORMAT_KEY.to_string(), value.clone());
368 }
369 SetRegionOption::AppendMode(value) => {
370 new_options
371 .extra_options
372 .insert(APPEND_MODE_KEY.to_string(), value.to_string());
373 if *value {
374 new_options.extra_options.remove(MERGE_MODE_KEY);
375 }
376 }
377 }
378 }
379 let mut builder = self.new_meta_builder();
380 builder.options(new_options);
381
382 Ok(builder)
383 }
384
385 fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
386 let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
387 self.set_table_options(&requests)
388 }
389
390 fn set_indexes(
391 &self,
392 table_name: &str,
393 requests: &[SetIndexOption],
394 ) -> Result<TableMetaBuilder> {
395 let table_schema = &self.schema;
396 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
397 for request in requests {
398 let column_name = request.column_name();
399 table_schema
400 .column_index_by_name(column_name)
401 .with_context(|| error::ColumnNotExistsSnafu {
402 column_name,
403 table_name,
404 })?;
405 set_index_options
406 .entry(column_name)
407 .or_default()
408 .push(request);
409 }
410
411 let mut meta_builder = self.new_meta_builder();
412 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
413 for mut column in table_schema.column_schemas().iter().cloned() {
414 if let Some(request) = set_index_options.get(column.name.as_str()) {
415 for request in request {
416 self.set_index(&mut column, request)?;
417 }
418 }
419 columns.push(column);
420 }
421
422 let mut builder = SchemaBuilder::try_from_columns(columns)
423 .with_context(|_| error::SchemaBuildSnafu {
424 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
425 })?
426 .version(table_schema.version() + 1);
427
428 for (k, v) in table_schema.metadata().iter() {
429 builder = builder.add_metadata(k, v);
430 }
431
432 let new_schema = builder.build().with_context(|_| {
433 let column_names = requests
434 .iter()
435 .map(|request| request.column_name())
436 .collect::<Vec<_>>();
437 error::SchemaBuildSnafu {
438 msg: format!(
439 "Table {table_name} cannot set index options with columns {column_names:?}",
440 ),
441 }
442 })?;
443 let _ = meta_builder
444 .schema(Arc::new(new_schema))
445 .primary_key_indices(self.primary_key_indices.clone());
446
447 Ok(meta_builder)
448 }
449
450 fn unset_indexes(
451 &self,
452 table_name: &str,
453 requests: &[UnsetIndexOption],
454 ) -> Result<TableMetaBuilder> {
455 let table_schema = &self.schema;
456 let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
457 for request in requests {
458 let column_name = request.column_name();
459 table_schema
460 .column_index_by_name(column_name)
461 .with_context(|| error::ColumnNotExistsSnafu {
462 column_name,
463 table_name,
464 })?;
465 set_index_options
466 .entry(column_name)
467 .or_default()
468 .push(request);
469 }
470
471 let mut meta_builder = self.new_meta_builder();
472 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
473 for mut column in table_schema.column_schemas().iter().cloned() {
474 if let Some(request) = set_index_options.get(column.name.as_str()) {
475 for request in request {
476 self.unset_index(&mut column, request)?;
477 }
478 }
479 columns.push(column);
480 }
481
482 let mut builder = SchemaBuilder::try_from_columns(columns)
483 .with_context(|_| error::SchemaBuildSnafu {
484 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
485 })?
486 .version(table_schema.version() + 1);
487
488 for (k, v) in table_schema.metadata().iter() {
489 builder = builder.add_metadata(k, v);
490 }
491
492 let new_schema = builder.build().with_context(|_| {
493 let column_names = requests
494 .iter()
495 .map(|request| request.column_name())
496 .collect::<Vec<_>>();
497 error::SchemaBuildSnafu {
498 msg: format!(
499 "Table {table_name} cannot set index options with columns {column_names:?}",
500 ),
501 }
502 })?;
503 let _ = meta_builder
504 .schema(Arc::new(new_schema))
505 .primary_key_indices(self.primary_key_indices.clone());
506
507 Ok(meta_builder)
508 }
509
510 fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
511 match request {
512 SetIndexOption::Fulltext {
513 column_name,
514 options,
515 } => {
516 ensure!(
517 column_schema.data_type.is_string(),
518 error::InvalidColumnOptionSnafu {
519 column_name,
520 msg: "FULLTEXT index only supports string type",
521 }
522 );
523
524 let current_fulltext_options = column_schema
525 .fulltext_options()
526 .context(error::SetFulltextOptionsSnafu { column_name })?;
527 set_column_fulltext_options(
528 column_schema,
529 column_name,
530 options,
531 current_fulltext_options,
532 )?;
533 }
534 SetIndexOption::Inverted { column_name } => {
535 debug_assert_eq!(column_schema.name, *column_name);
536 column_schema.set_inverted_index(true);
537 }
538 SetIndexOption::Skipping {
539 column_name,
540 options,
541 } => {
542 set_column_skipping_index_options(column_schema, column_name, options)?;
543 }
544 }
545
546 Ok(())
547 }
548
549 fn unset_index(
550 &self,
551 column_schema: &mut ColumnSchema,
552 request: &UnsetIndexOption,
553 ) -> Result<()> {
554 match request {
555 UnsetIndexOption::Fulltext { column_name } => {
556 let current_fulltext_options = column_schema
557 .fulltext_options()
558 .context(error::SetFulltextOptionsSnafu { column_name })?;
559 unset_column_fulltext_options(
560 column_schema,
561 column_name,
562 current_fulltext_options.clone(),
563 )?
564 }
565 UnsetIndexOption::Inverted { .. } => {
566 column_schema.set_inverted_index(false);
567 }
568 UnsetIndexOption::Skipping { column_name } => {
569 unset_column_skipping_index_options(column_schema, column_name)?;
570 }
571 }
572
573 Ok(())
574 }
575
576 pub fn alloc_new_column(
581 &mut self,
582 table_name: &str,
583 new_column: &ColumnSchema,
584 ) -> Result<ColumnDescriptor> {
585 let desc = ColumnDescriptorBuilder::new(
586 self.next_column_id as ColumnId,
587 &new_column.name,
588 new_column.data_type.clone(),
589 )
590 .is_nullable(new_column.is_nullable())
591 .default_constraint(new_column.default_constraint().cloned())
592 .build()
593 .context(error::BuildColumnDescriptorSnafu {
594 table_name,
595 column_name: &new_column.name,
596 })?;
597
598 self.next_column_id += 1;
600
601 Ok(desc)
602 }
603
604 fn new_meta_builder(&self) -> TableMetaBuilder {
606 let mut builder = TableMetaBuilder::from(self);
607 builder.value_indices = None;
609 builder
610 }
611
612 fn add_columns(
614 &self,
615 table_name: &str,
616 requests: &[AddColumnRequest],
617 ) -> Result<TableMetaBuilder> {
618 let table_schema = &self.schema;
619 let mut meta_builder = self.new_meta_builder();
620 let original_primary_key_indices: HashSet<&usize> =
621 self.primary_key_indices.iter().collect();
622
623 let mut names = HashSet::with_capacity(requests.len());
624 let mut new_columns = Vec::with_capacity(requests.len());
625 for col_to_add in requests {
626 if let Some(column_schema) =
627 table_schema.column_schema_by_name(&col_to_add.column_schema.name)
628 {
629 ensure!(
631 col_to_add.add_if_not_exists,
632 error::ColumnExistsSnafu {
633 table_name,
634 column_name: &col_to_add.column_schema.name
635 },
636 );
637
638 ensure!(
640 column_schema.data_type == col_to_add.column_schema.data_type,
641 error::InvalidAlterRequestSnafu {
642 table: table_name,
643 err: format!(
644 "column {} already exists with different type {:?}",
645 col_to_add.column_schema.name, column_schema.data_type,
646 ),
647 }
648 );
649 } else {
650 ensure!(
653 names.insert(&col_to_add.column_schema.name),
654 error::InvalidAlterRequestSnafu {
655 table: table_name,
656 err: format!(
657 "add column {} more than once",
658 col_to_add.column_schema.name
659 ),
660 }
661 );
662
663 ensure!(
664 col_to_add.column_schema.is_nullable()
665 || col_to_add.column_schema.default_constraint().is_some(),
666 error::InvalidAlterRequestSnafu {
667 table: table_name,
668 err: format!(
669 "no default value for column {}",
670 col_to_add.column_schema.name
671 ),
672 },
673 );
674
675 new_columns.push(col_to_add.clone());
676 }
677 }
678 let requests = &new_columns[..];
679
680 let SplitResult {
681 columns_at_first,
682 columns_at_after,
683 columns_at_last,
684 column_names,
685 } = self.split_requests_by_column_location(table_name, requests)?;
686 let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
687 let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
688 columns_at_first.iter().rev().for_each(|request| {
690 if request.is_key {
691 primary_key_indices.push(columns.len());
693 }
694 columns.push(request.column_schema.clone());
695 });
696 for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
698 if original_primary_key_indices.contains(&index) {
699 primary_key_indices.push(columns.len());
700 }
701 columns.push(column_schema.clone());
702 if let Some(requests) = columns_at_after.get(&column_schema.name) {
703 requests.iter().rev().for_each(|request| {
704 if request.is_key {
705 primary_key_indices.push(columns.len());
707 }
708 columns.push(request.column_schema.clone());
709 });
710 }
711 }
712 columns_at_last.iter().for_each(|request| {
714 if request.is_key {
715 primary_key_indices.push(columns.len());
717 }
718 columns.push(request.column_schema.clone());
719 });
720
721 let mut builder = SchemaBuilder::try_from(columns)
722 .with_context(|_| error::SchemaBuildSnafu {
723 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
724 })?
725 .version(table_schema.version() + 1);
727 for (k, v) in table_schema.metadata().iter() {
728 builder = builder.add_metadata(k, v);
729 }
730 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
731 msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
732 })?;
733
734 let partition_key_indices = self
735 .partition_key_indices
736 .iter()
737 .map(|idx| table_schema.column_name_by_index(*idx))
738 .map(|name| new_schema.column_index_by_name(name).unwrap())
740 .collect();
741
742 let _ = meta_builder
744 .schema(Arc::new(new_schema))
745 .primary_key_indices(primary_key_indices)
746 .partition_key_indices(partition_key_indices);
747
748 Ok(meta_builder)
749 }
750
751 fn remove_columns(
752 &self,
753 table_name: &str,
754 column_names: &[String],
755 ) -> Result<TableMetaBuilder> {
756 let table_schema = &self.schema;
757 let column_names: HashSet<_> = column_names.iter().collect();
758 let mut meta_builder = self.new_meta_builder();
759
760 let timestamp_index = table_schema.timestamp_index();
761 for column_name in &column_names {
763 if let Some(index) = table_schema.column_index_by_name(column_name) {
764 ensure!(
767 !self.primary_key_indices.contains(&index),
768 error::RemoveColumnInIndexSnafu {
769 column_name: *column_name,
770 table_name,
771 }
772 );
773
774 ensure!(
775 !self.partition_key_indices.contains(&index),
776 error::RemovePartitionColumnSnafu {
777 column_name: *column_name,
778 table_name,
779 }
780 );
781
782 if let Some(ts_index) = timestamp_index {
783 ensure!(
785 index != ts_index,
786 error::RemoveColumnInIndexSnafu {
787 column_name: table_schema.column_name_by_index(ts_index),
788 table_name,
789 }
790 );
791 }
792 } else {
793 return error::ColumnNotExistsSnafu {
794 column_name: *column_name,
795 table_name,
796 }
797 .fail()?;
798 }
799 }
800
801 let columns: Vec<_> = table_schema
803 .column_schemas()
804 .iter()
805 .filter(|column_schema| !column_names.contains(&column_schema.name))
806 .cloned()
807 .collect();
808
809 let mut builder = SchemaBuilder::try_from_columns(columns)
810 .with_context(|_| error::SchemaBuildSnafu {
811 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
812 })?
813 .version(table_schema.version() + 1);
815 for (k, v) in table_schema.metadata().iter() {
816 builder = builder.add_metadata(k, v);
817 }
818 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
819 msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
820 })?;
821
822 let primary_key_indices = self
824 .primary_key_indices
825 .iter()
826 .map(|idx| table_schema.column_name_by_index(*idx))
827 .map(|name| new_schema.column_index_by_name(name).unwrap())
829 .collect();
830
831 let partition_key_indices = self
832 .partition_key_indices
833 .iter()
834 .map(|idx| table_schema.column_name_by_index(*idx))
835 .map(|name| new_schema.column_index_by_name(name).unwrap())
837 .collect();
838
839 let _ = meta_builder
840 .schema(Arc::new(new_schema))
841 .primary_key_indices(primary_key_indices)
842 .partition_key_indices(partition_key_indices);
843
844 Ok(meta_builder)
845 }
846
847 fn modify_column_types(
848 &self,
849 table_name: &str,
850 requests: &[ModifyColumnTypeRequest],
851 ) -> Result<TableMetaBuilder> {
852 let table_schema = &self.schema;
853 let mut meta_builder = self.new_meta_builder();
854
855 let mut modify_column_types = HashMap::with_capacity(requests.len());
856 let timestamp_index = table_schema.timestamp_index();
857
858 for col_to_change in requests {
859 let change_column_name = &col_to_change.column_name;
860
861 let index = table_schema
862 .column_index_by_name(change_column_name)
863 .with_context(|| error::ColumnNotExistsSnafu {
864 column_name: change_column_name,
865 table_name,
866 })?;
867
868 let column = &table_schema.column_schemas()[index];
869
870 ensure!(
871 !self.primary_key_indices.contains(&index),
872 error::InvalidAlterRequestSnafu {
873 table: table_name,
874 err: format!(
875 "Not allowed to change primary key index column '{}'",
876 column.name
877 )
878 }
879 );
880
881 if let Some(ts_index) = timestamp_index {
882 ensure!(
884 index != ts_index,
885 error::InvalidAlterRequestSnafu {
886 table: table_name,
887 err: format!(
888 "Not allowed to change timestamp index column '{}' datatype",
889 column.name
890 )
891 }
892 );
893 }
894
895 ensure!(
896 modify_column_types
897 .insert(&col_to_change.column_name, col_to_change)
898 .is_none(),
899 error::InvalidAlterRequestSnafu {
900 table: table_name,
901 err: format!(
902 "change column datatype {} more than once",
903 col_to_change.column_name
904 ),
905 }
906 );
907
908 ensure!(
909 column
910 .data_type
911 .can_arrow_type_cast_to(&col_to_change.target_type),
912 error::InvalidAlterRequestSnafu {
913 table: table_name,
914 err: format!(
915 "column '{}' cannot be cast automatically to type '{}'",
916 col_to_change.column_name, col_to_change.target_type,
917 ),
918 }
919 );
920
921 ensure!(
922 column.is_nullable(),
923 error::InvalidAlterRequestSnafu {
924 table: table_name,
925 err: format!(
926 "column '{}' must be nullable to ensure safe conversion.",
927 col_to_change.column_name,
928 ),
929 }
930 );
931 }
932 let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
935 for mut column in table_schema.column_schemas().iter().cloned() {
936 if let Some(change_column) = modify_column_types.get(&column.name) {
937 column.data_type = change_column.target_type.clone();
938 let new_default = if let Some(default_value) = column.default_constraint() {
939 Some(
940 default_value
941 .cast_to_datatype(&change_column.target_type)
942 .with_context(|_| error::CastDefaultValueSnafu {
943 reason: format!(
944 "Failed to cast default value from {:?} to type {:?}",
945 default_value, &change_column.target_type
946 ),
947 })?,
948 )
949 } else {
950 None
951 };
952 column = column
953 .clone()
954 .with_default_constraint(new_default.clone())
955 .with_context(|_| error::CastDefaultValueSnafu {
956 reason: format!("Failed to set new default: {:?}", new_default),
957 })?;
958 }
959 columns.push(column)
960 }
961
962 let mut builder = SchemaBuilder::try_from_columns(columns)
963 .with_context(|_| error::SchemaBuildSnafu {
964 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
965 })?
966 .version(table_schema.version() + 1);
968 for (k, v) in table_schema.metadata().iter() {
969 builder = builder.add_metadata(k, v);
970 }
971 let new_schema = builder.build().with_context(|_| {
972 let column_names: Vec<_> = requests
973 .iter()
974 .map(|request| &request.column_name)
975 .collect();
976
977 error::SchemaBuildSnafu {
978 msg: format!(
979 "Table {table_name} cannot change datatype with columns {column_names:?}"
980 ),
981 }
982 })?;
983
984 let _ = meta_builder
985 .schema(Arc::new(new_schema))
986 .primary_key_indices(self.primary_key_indices.clone());
987
988 Ok(meta_builder)
989 }
990
991 fn split_requests_by_column_location<'a>(
993 &self,
994 table_name: &str,
995 requests: &'a [AddColumnRequest],
996 ) -> Result<SplitResult<'a>> {
997 let table_schema = &self.schema;
998 let mut columns_at_first = Vec::new();
999 let mut columns_at_after = HashMap::new();
1000 let mut columns_at_last = Vec::new();
1001 let mut column_names = Vec::with_capacity(requests.len());
1002 for request in requests {
1003 let column_name = &request.column_schema.name;
1005 column_names.push(column_name.clone());
1006 ensure!(
1007 table_schema.column_schema_by_name(column_name).is_none(),
1008 error::ColumnExistsSnafu {
1009 column_name,
1010 table_name,
1011 }
1012 );
1013 match request.location.as_ref() {
1014 Some(AddColumnLocation::First) => {
1015 columns_at_first.push(request);
1016 }
1017 Some(AddColumnLocation::After { column_name }) => {
1018 ensure!(
1019 table_schema.column_schema_by_name(column_name).is_some(),
1020 error::ColumnNotExistsSnafu {
1021 column_name,
1022 table_name,
1023 }
1024 );
1025 columns_at_after
1026 .entry(column_name.clone())
1027 .or_insert(Vec::new())
1028 .push(request);
1029 }
1030 None => {
1031 columns_at_last.push(request);
1032 }
1033 }
1034 }
1035 Ok(SplitResult {
1036 columns_at_first,
1037 columns_at_after,
1038 columns_at_last,
1039 column_names,
1040 })
1041 }
1042
1043 fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
1044 let table_schema = &self.schema;
1045 let mut meta_builder = self.new_meta_builder();
1046 let mut columns = Vec::with_capacity(table_schema.num_columns());
1047 for column_schema in table_schema.column_schemas() {
1048 if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
1049 ensure!(
1051 column_schema.default_constraint().is_some(),
1052 error::InvalidAlterRequestSnafu {
1053 table: table_name,
1054 err: format!("column {name} does not have a default value"),
1055 }
1056 );
1057 if !column_schema.is_nullable() {
1058 return error::InvalidAlterRequestSnafu {
1059 table: table_name,
1060 err: format!(
1061 "column {name} is not nullable and `default` cannot be dropped",
1062 ),
1063 }
1064 .fail();
1065 }
1066 let new_column_schema = column_schema.clone();
1067 let new_column_schema = new_column_schema
1068 .with_default_constraint(None)
1069 .with_context(|_| error::SchemaBuildSnafu {
1070 msg: format!("Table {table_name} cannot drop default values"),
1071 })?;
1072 columns.push(new_column_schema);
1073 } else {
1074 columns.push(column_schema.clone());
1075 }
1076 }
1077
1078 let mut builder = SchemaBuilder::try_from_columns(columns)
1079 .with_context(|_| error::SchemaBuildSnafu {
1080 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1081 })?
1082 .version(table_schema.version() + 1);
1084 for (k, v) in table_schema.metadata().iter() {
1085 builder = builder.add_metadata(k, v);
1086 }
1087 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1088 msg: format!("Table {table_name} cannot drop default values"),
1089 })?;
1090
1091 let _ = meta_builder.schema(Arc::new(new_schema));
1092
1093 Ok(meta_builder)
1094 }
1095
1096 fn set_defaults(
1097 &self,
1098 table_name: &str,
1099 set_defaults: &[SetDefaultRequest],
1100 ) -> Result<TableMetaBuilder> {
1101 let table_schema = &self.schema;
1102 let mut meta_builder = self.new_meta_builder();
1103 let mut columns = Vec::with_capacity(table_schema.num_columns());
1104 for column_schema in table_schema.column_schemas() {
1105 if let Some(set_default) = set_defaults
1106 .iter()
1107 .find(|s| s.column_name == column_schema.name)
1108 {
1109 let new_column_schema = column_schema.clone();
1110 let new_column_schema = new_column_schema
1111 .with_default_constraint(set_default.default_constraint.clone())
1112 .with_context(|_| error::SchemaBuildSnafu {
1113 msg: format!("Table {table_name} cannot set default values"),
1114 })?;
1115 columns.push(new_column_schema);
1116 } else {
1117 columns.push(column_schema.clone());
1118 }
1119 }
1120
1121 let mut builder = SchemaBuilder::try_from_columns(columns)
1122 .with_context(|_| error::SchemaBuildSnafu {
1123 msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1124 })?
1125 .version(table_schema.version() + 1);
1127 for (k, v) in table_schema.metadata().iter() {
1128 builder = builder.add_metadata(k, v);
1129 }
1130 let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1131 msg: format!("Table {table_name} cannot set default values"),
1132 })?;
1133
1134 let _ = meta_builder.schema(Arc::new(new_schema));
1135
1136 Ok(meta_builder)
1137 }
1138}
1139
1140#[derive(Clone, Debug, PartialEq, Eq, Builder, Serialize, Deserialize)]
1141#[builder(pattern = "owned")]
1142pub struct TableInfo {
1143 #[builder(default, setter(into))]
1145 pub ident: TableIdent,
1146 #[builder(setter(into))]
1148 pub name: String,
1149 #[builder(default, setter(into))]
1151 pub desc: Option<String>,
1152 #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1153 pub catalog_name: String,
1154 #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1155 pub schema_name: String,
1156 pub meta: TableMeta,
1157 #[builder(default = "TableType::Base")]
1158 pub table_type: TableType,
1159}
1160
1161pub type TableInfoRef = Arc<TableInfo>;
1162
1163impl TableInfo {
1164 pub fn table_id(&self) -> TableId {
1165 self.ident.table_id
1166 }
1167
1168 pub fn full_table_name(&self) -> String {
1170 common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1171 }
1172
1173 pub fn get_db_string(&self) -> String {
1174 common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1175 }
1176
1177 pub fn is_physical_table(&self) -> bool {
1179 self.meta
1180 .options
1181 .extra_options
1182 .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1183 }
1184
1185 pub fn is_ttl_instant_table(&self) -> bool {
1187 self.meta
1188 .options
1189 .ttl
1190 .map(|t| t.is_instant())
1191 .unwrap_or(false)
1192 }
1193}
1194
1195impl TableInfoBuilder {
1196 pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1197 Self {
1198 name: Some(name.into()),
1199 meta: Some(meta),
1200 ..Default::default()
1201 }
1202 }
1203
1204 pub fn table_id(mut self, id: TableId) -> Self {
1205 let ident = self.ident.get_or_insert_with(TableIdent::default);
1206 ident.table_id = id;
1207 self
1208 }
1209
1210 pub fn table_version(mut self, version: TableVersion) -> Self {
1211 let ident = self.ident.get_or_insert_with(TableIdent::default);
1212 ident.version = version;
1213 self
1214 }
1215}
1216
1217impl TableIdent {
1218 pub fn new(table_id: TableId) -> Self {
1219 Self {
1220 table_id,
1221 version: 0,
1222 }
1223 }
1224}
1225
1226impl From<TableId> for TableIdent {
1227 fn from(table_id: TableId) -> Self {
1228 Self::new(table_id)
1229 }
1230}
1231
1232impl TableInfo {
1233 pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1237 let column_schemas = self.meta.schema.column_schemas();
1238 if self.meta.column_ids.len() != column_schemas.len() {
1239 None
1240 } else {
1241 Some(
1242 self.meta
1243 .column_ids
1244 .iter()
1245 .enumerate()
1246 .map(|(index, id)| (column_schemas[index].name.clone(), *id))
1247 .collect(),
1248 )
1249 }
1250 }
1251
1252 pub fn sort_columns(&mut self) {
1254 let column_schemas = self.meta.schema.column_schemas();
1255 let primary_keys = self
1256 .meta
1257 .primary_key_indices
1258 .iter()
1259 .map(|index| column_schemas[*index].name.clone())
1260 .collect::<HashSet<_>>();
1261
1262 let name_to_ids = self.name_to_ids().unwrap_or_default();
1263 let mut column_schemas = column_schemas.to_vec();
1264 column_schemas.sort_unstable_by(|a, b| a.name.cmp(&b.name));
1265
1266 let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1268 let mut value_indices = Vec::with_capacity(column_schemas.len() - primary_keys.len());
1269 let mut column_ids = Vec::with_capacity(column_schemas.len());
1270 for (index, column_schema) in column_schemas.iter().enumerate() {
1271 if primary_keys.contains(&column_schema.name) {
1272 primary_key_indices.push(index);
1273 } else {
1274 value_indices.push(index);
1275 }
1276 if let Some(id) = name_to_ids.get(&column_schema.name) {
1277 column_ids.push(*id);
1278 }
1279 }
1280
1281 self.meta.schema = Arc::new(Schema::new_with_version(
1283 column_schemas,
1284 self.meta.schema.version(),
1285 ));
1286 self.meta.primary_key_indices = primary_key_indices;
1287 self.meta.value_indices = value_indices;
1288 self.meta.column_ids = column_ids;
1289 }
1290
1291 pub fn to_region_options(&self) -> HashMap<String, String> {
1295 HashMap::from(&self.meta.options)
1296 }
1297
1298 pub fn table_ref(&self) -> TableReference<'_> {
1300 TableReference::full(
1301 self.catalog_name.as_str(),
1302 self.schema_name.as_str(),
1303 self.name.as_str(),
1304 )
1305 }
1306}
1307
1308fn set_column_fulltext_options(
1317 column_schema: &mut ColumnSchema,
1318 column_name: &str,
1319 options: &FulltextOptions,
1320 current_options: Option<FulltextOptions>,
1321) -> Result<()> {
1322 if let Some(current_options) = current_options {
1323 ensure!(
1324 current_options.analyzer == options.analyzer
1325 && current_options.case_sensitive == options.case_sensitive,
1326 error::InvalidColumnOptionSnafu {
1327 column_name,
1328 msg: format!(
1329 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1330 current_options.analyzer, current_options.case_sensitive
1331 ),
1332 }
1333 );
1334 }
1335
1336 column_schema
1337 .set_fulltext_options(options)
1338 .context(error::SetFulltextOptionsSnafu { column_name })?;
1339
1340 Ok(())
1341}
1342
1343fn unset_column_fulltext_options(
1344 column_schema: &mut ColumnSchema,
1345 column_name: &str,
1346 current_options: Option<FulltextOptions>,
1347) -> Result<()> {
1348 ensure!(
1349 current_options
1350 .as_ref()
1351 .is_some_and(|options| options.enable),
1352 error::InvalidColumnOptionSnafu {
1353 column_name,
1354 msg: "FULLTEXT index already disabled".to_string(),
1355 }
1356 );
1357
1358 let mut options = current_options.unwrap();
1359 options.enable = false;
1360 column_schema
1361 .set_fulltext_options(&options)
1362 .context(error::SetFulltextOptionsSnafu { column_name })?;
1363
1364 Ok(())
1365}
1366
1367fn set_column_skipping_index_options(
1368 column_schema: &mut ColumnSchema,
1369 column_name: &str,
1370 options: &SkippingIndexOptions,
1371) -> Result<()> {
1372 column_schema
1373 .set_skipping_options(options)
1374 .context(error::SetSkippingOptionsSnafu { column_name })?;
1375
1376 Ok(())
1377}
1378
1379fn unset_column_skipping_index_options(
1380 column_schema: &mut ColumnSchema,
1381 column_name: &str,
1382) -> Result<()> {
1383 column_schema
1384 .unset_skipping_options()
1385 .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1386 Ok(())
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391 use std::assert_matches::assert_matches;
1392
1393 use common_error::ext::ErrorExt;
1394 use common_error::status_code::StatusCode;
1395 use datatypes::data_type::ConcreteDataType;
1396 use datatypes::schema::{
1397 ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1398 };
1399
1400 use super::*;
1401 use crate::Error;
1402
1403 fn new_test_schema() -> Schema {
1405 let column_schemas = vec![
1406 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1407 ColumnSchema::new(
1408 "ts",
1409 ConcreteDataType::timestamp_millisecond_datatype(),
1410 false,
1411 )
1412 .with_time_index(true),
1413 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1414 ];
1415 SchemaBuilder::try_from(column_schemas)
1416 .unwrap()
1417 .version(123)
1418 .build()
1419 .unwrap()
1420 }
1421
1422 fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1423 let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1424 let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1425 let alter_kind = AlterKind::AddColumns {
1426 columns: vec![
1427 AddColumnRequest {
1428 column_schema: new_tag,
1429 is_key: true,
1430 location: None,
1431 add_if_not_exists: false,
1432 },
1433 AddColumnRequest {
1434 column_schema: new_field,
1435 is_key: false,
1436 location: None,
1437 add_if_not_exists: false,
1438 },
1439 ],
1440 };
1441
1442 let builder = meta
1443 .builder_with_alter_kind("my_table", &alter_kind)
1444 .unwrap();
1445 builder.build().unwrap()
1446 }
1447
1448 fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1449 let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1450 let new_field = ColumnSchema::new(
1451 "my_field_after_ts",
1452 ConcreteDataType::string_datatype(),
1453 true,
1454 );
1455 let yet_another_field = ColumnSchema::new(
1456 "yet_another_field_after_ts",
1457 ConcreteDataType::int64_datatype(),
1458 true,
1459 );
1460 let alter_kind = AlterKind::AddColumns {
1461 columns: vec![
1462 AddColumnRequest {
1463 column_schema: new_tag,
1464 is_key: true,
1465 location: Some(AddColumnLocation::First),
1466 add_if_not_exists: false,
1467 },
1468 AddColumnRequest {
1469 column_schema: new_field,
1470 is_key: false,
1471 location: Some(AddColumnLocation::After {
1472 column_name: "ts".to_string(),
1473 }),
1474 add_if_not_exists: false,
1475 },
1476 AddColumnRequest {
1477 column_schema: yet_another_field,
1478 is_key: true,
1479 location: Some(AddColumnLocation::After {
1480 column_name: "ts".to_string(),
1481 }),
1482 add_if_not_exists: false,
1483 },
1484 ],
1485 };
1486
1487 let builder = meta
1488 .builder_with_alter_kind("my_table", &alter_kind)
1489 .unwrap();
1490 builder.build().unwrap()
1491 }
1492
1493 #[test]
1494 fn test_add_columns() {
1495 let schema = Arc::new(new_test_schema());
1496 let meta = TableMetaBuilder::empty()
1497 .schema(schema)
1498 .primary_key_indices(vec![0])
1499 .engine("engine")
1500 .next_column_id(3)
1501 .build()
1502 .unwrap();
1503
1504 let new_meta = add_columns_to_meta(&meta);
1505 let names: Vec<String> = new_meta
1506 .schema
1507 .column_schemas()
1508 .iter()
1509 .map(|column_schema| column_schema.name.clone())
1510 .collect();
1511 assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1512 assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1513 assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1514 }
1515
1516 #[test]
1517 fn test_set_append_mode_true_clears_merge_mode_option() {
1518 let schema = Arc::new(new_test_schema());
1519 let mut table_options = TableOptions::default();
1520 table_options
1521 .extra_options
1522 .insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1523 let meta = TableMetaBuilder::empty()
1524 .schema(schema)
1525 .primary_key_indices(vec![0])
1526 .engine("engine")
1527 .next_column_id(3)
1528 .options(table_options)
1529 .build()
1530 .unwrap();
1531
1532 let alter_kind = AlterKind::SetTableOptions {
1533 options: vec![SetRegionOption::AppendMode(true)],
1534 };
1535 let new_meta = meta
1536 .builder_with_alter_kind("my_table", &alter_kind)
1537 .unwrap()
1538 .build()
1539 .unwrap();
1540
1541 assert_eq!(
1542 Some("true"),
1543 new_meta
1544 .options
1545 .extra_options
1546 .get(APPEND_MODE_KEY)
1547 .map(String::as_str)
1548 );
1549 assert!(!new_meta.options.extra_options.contains_key(MERGE_MODE_KEY));
1550 }
1551
1552 #[test]
1553 fn test_set_append_mode_false_keeps_merge_mode_option() {
1554 let schema = Arc::new(new_test_schema());
1555 let mut table_options = TableOptions::default();
1556 table_options
1557 .extra_options
1558 .insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1559 let meta = TableMetaBuilder::empty()
1560 .schema(schema)
1561 .primary_key_indices(vec![0])
1562 .engine("engine")
1563 .next_column_id(3)
1564 .options(table_options)
1565 .build()
1566 .unwrap();
1567
1568 let alter_kind = AlterKind::SetTableOptions {
1569 options: vec![SetRegionOption::AppendMode(false)],
1570 };
1571 let new_meta = meta
1572 .builder_with_alter_kind("my_table", &alter_kind)
1573 .unwrap()
1574 .build()
1575 .unwrap();
1576
1577 assert_eq!(
1578 Some("false"),
1579 new_meta
1580 .options
1581 .extra_options
1582 .get(APPEND_MODE_KEY)
1583 .map(String::as_str)
1584 );
1585 assert_eq!(
1586 Some("last_non_null"),
1587 new_meta
1588 .options
1589 .extra_options
1590 .get(MERGE_MODE_KEY)
1591 .map(String::as_str)
1592 );
1593 }
1594
1595 #[test]
1596 fn test_add_columns_multiple_times() {
1597 let schema = Arc::new(new_test_schema());
1598 let meta = TableMetaBuilder::empty()
1599 .schema(schema)
1600 .primary_key_indices(vec![0])
1601 .engine("engine")
1602 .next_column_id(3)
1603 .build()
1604 .unwrap();
1605
1606 let alter_kind = AlterKind::AddColumns {
1607 columns: vec![
1608 AddColumnRequest {
1609 column_schema: ColumnSchema::new(
1610 "col3",
1611 ConcreteDataType::int32_datatype(),
1612 true,
1613 ),
1614 is_key: true,
1615 location: None,
1616 add_if_not_exists: true,
1617 },
1618 AddColumnRequest {
1619 column_schema: ColumnSchema::new(
1620 "col3",
1621 ConcreteDataType::int32_datatype(),
1622 true,
1623 ),
1624 is_key: true,
1625 location: None,
1626 add_if_not_exists: true,
1627 },
1628 ],
1629 };
1630 let err = meta
1631 .builder_with_alter_kind("my_table", &alter_kind)
1632 .err()
1633 .unwrap();
1634 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1635 }
1636
1637 #[test]
1638 fn test_remove_columns() {
1639 let schema = Arc::new(new_test_schema());
1640 let meta = TableMetaBuilder::empty()
1641 .schema(schema.clone())
1642 .primary_key_indices(vec![0])
1643 .engine("engine")
1644 .next_column_id(3)
1645 .build()
1646 .unwrap();
1647 let meta = add_columns_to_meta(&meta);
1649
1650 let alter_kind = AlterKind::DropColumns {
1651 names: vec![String::from("col2"), String::from("my_field")],
1652 };
1653 let new_meta = meta
1654 .builder_with_alter_kind("my_table", &alter_kind)
1655 .unwrap()
1656 .build()
1657 .unwrap();
1658
1659 let names: Vec<String> = new_meta
1660 .schema
1661 .column_schemas()
1662 .iter()
1663 .map(|column_schema| column_schema.name.clone())
1664 .collect();
1665 assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1666 assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1667 assert_eq!(&[1], &new_meta.value_indices[..]);
1668 assert_eq!(
1669 schema.timestamp_column(),
1670 new_meta.schema.timestamp_column()
1671 );
1672 }
1673
1674 #[test]
1675 fn test_remove_multiple_columns_before_timestamp() {
1676 let column_schemas = vec![
1677 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1678 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1679 ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1680 ColumnSchema::new(
1681 "ts",
1682 ConcreteDataType::timestamp_millisecond_datatype(),
1683 false,
1684 )
1685 .with_time_index(true),
1686 ];
1687 let schema = Arc::new(
1688 SchemaBuilder::try_from(column_schemas)
1689 .unwrap()
1690 .version(123)
1691 .build()
1692 .unwrap(),
1693 );
1694 let meta = TableMetaBuilder::empty()
1695 .schema(schema.clone())
1696 .primary_key_indices(vec![1])
1697 .engine("engine")
1698 .next_column_id(4)
1699 .build()
1700 .unwrap();
1701
1702 let alter_kind = AlterKind::DropColumns {
1704 names: vec![String::from("col3"), String::from("col1")],
1705 };
1706 let new_meta = meta
1707 .builder_with_alter_kind("my_table", &alter_kind)
1708 .unwrap()
1709 .build()
1710 .unwrap();
1711
1712 let names: Vec<String> = new_meta
1713 .schema
1714 .column_schemas()
1715 .iter()
1716 .map(|column_schema| column_schema.name.clone())
1717 .collect();
1718 assert_eq!(&["col2", "ts"], &names[..]);
1719 assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1720 assert_eq!(&[1], &new_meta.value_indices[..]);
1721 assert_eq!(
1722 schema.timestamp_column(),
1723 new_meta.schema.timestamp_column()
1724 );
1725 }
1726
1727 #[test]
1728 fn test_add_existing_column() {
1729 let schema = Arc::new(new_test_schema());
1730 let meta = TableMetaBuilder::empty()
1731 .schema(schema)
1732 .primary_key_indices(vec![0])
1733 .engine("engine")
1734 .next_column_id(3)
1735 .build()
1736 .unwrap();
1737
1738 let alter_kind = AlterKind::AddColumns {
1739 columns: vec![AddColumnRequest {
1740 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1741 is_key: false,
1742 location: None,
1743 add_if_not_exists: false,
1744 }],
1745 };
1746
1747 let err = meta
1748 .builder_with_alter_kind("my_table", &alter_kind)
1749 .err()
1750 .unwrap();
1751 assert_eq!(StatusCode::TableColumnExists, err.status_code());
1752
1753 let alter_kind = AlterKind::AddColumns {
1755 columns: vec![AddColumnRequest {
1756 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1757 is_key: true,
1758 location: None,
1759 add_if_not_exists: true,
1760 }],
1761 };
1762 let new_meta = meta
1763 .builder_with_alter_kind("my_table", &alter_kind)
1764 .unwrap()
1765 .build()
1766 .unwrap();
1767 assert_eq!(
1768 meta.schema.column_schemas(),
1769 new_meta.schema.column_schemas()
1770 );
1771 assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1772 }
1773
1774 #[test]
1775 fn test_add_different_type_column() {
1776 let schema = Arc::new(new_test_schema());
1777 let meta = TableMetaBuilder::empty()
1778 .schema(schema)
1779 .primary_key_indices(vec![0])
1780 .engine("engine")
1781 .next_column_id(3)
1782 .build()
1783 .unwrap();
1784
1785 let alter_kind = AlterKind::AddColumns {
1787 columns: vec![AddColumnRequest {
1788 column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1789 is_key: false,
1790 location: None,
1791 add_if_not_exists: true,
1792 }],
1793 };
1794 let err = meta
1795 .builder_with_alter_kind("my_table", &alter_kind)
1796 .err()
1797 .unwrap();
1798 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1799 }
1800
1801 #[test]
1802 fn test_add_invalid_column() {
1803 let schema = Arc::new(new_test_schema());
1804 let meta = TableMetaBuilder::empty()
1805 .schema(schema)
1806 .primary_key_indices(vec![0])
1807 .engine("engine")
1808 .next_column_id(3)
1809 .build()
1810 .unwrap();
1811
1812 let alter_kind = AlterKind::AddColumns {
1814 columns: vec![AddColumnRequest {
1815 column_schema: ColumnSchema::new(
1816 "weny",
1817 ConcreteDataType::string_datatype(),
1818 false,
1819 ),
1820 is_key: false,
1821 location: None,
1822 add_if_not_exists: false,
1823 }],
1824 };
1825
1826 let err = meta
1827 .builder_with_alter_kind("my_table", &alter_kind)
1828 .err()
1829 .unwrap();
1830 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1831 }
1832
1833 #[test]
1834 fn test_remove_unknown_column() {
1835 let schema = Arc::new(new_test_schema());
1836 let meta = TableMetaBuilder::empty()
1837 .schema(schema)
1838 .primary_key_indices(vec![0])
1839 .engine("engine")
1840 .next_column_id(3)
1841 .build()
1842 .unwrap();
1843
1844 let alter_kind = AlterKind::DropColumns {
1845 names: vec![String::from("unknown")],
1846 };
1847
1848 let err = meta
1849 .builder_with_alter_kind("my_table", &alter_kind)
1850 .err()
1851 .unwrap();
1852 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1853 }
1854
1855 #[test]
1856 fn test_change_unknown_column_data_type() {
1857 let schema = Arc::new(new_test_schema());
1858 let meta = TableMetaBuilder::empty()
1859 .schema(schema)
1860 .primary_key_indices(vec![0])
1861 .engine("engine")
1862 .next_column_id(3)
1863 .build()
1864 .unwrap();
1865
1866 let alter_kind = AlterKind::ModifyColumnTypes {
1867 columns: vec![ModifyColumnTypeRequest {
1868 column_name: "unknown".to_string(),
1869 target_type: ConcreteDataType::string_datatype(),
1870 }],
1871 };
1872
1873 let err = meta
1874 .builder_with_alter_kind("my_table", &alter_kind)
1875 .err()
1876 .unwrap();
1877 assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1878 }
1879
1880 #[test]
1881 fn test_remove_key_column() {
1882 let schema = Arc::new(new_test_schema());
1883 let meta = TableMetaBuilder::empty()
1884 .schema(schema)
1885 .primary_key_indices(vec![0])
1886 .engine("engine")
1887 .next_column_id(3)
1888 .build()
1889 .unwrap();
1890
1891 let alter_kind = AlterKind::DropColumns {
1893 names: vec![String::from("col1")],
1894 };
1895
1896 let err = meta
1897 .builder_with_alter_kind("my_table", &alter_kind)
1898 .err()
1899 .unwrap();
1900 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1901
1902 let alter_kind = AlterKind::DropColumns {
1904 names: vec![String::from("ts")],
1905 };
1906
1907 let err = meta
1908 .builder_with_alter_kind("my_table", &alter_kind)
1909 .err()
1910 .unwrap();
1911 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1912 }
1913
1914 #[test]
1915 fn test_remove_partition_column() {
1916 let schema = Arc::new(new_test_schema());
1917 let meta = TableMetaBuilder::empty()
1918 .schema(schema)
1919 .primary_key_indices(vec![])
1920 .partition_key_indices(vec![0])
1921 .engine("engine")
1922 .next_column_id(3)
1923 .build()
1924 .unwrap();
1925 let alter_kind = AlterKind::DropColumns {
1927 names: vec![String::from("col1")],
1928 };
1929
1930 let err = meta
1931 .builder_with_alter_kind("my_table", &alter_kind)
1932 .err()
1933 .unwrap();
1934 assert_matches!(err, Error::RemovePartitionColumn { .. });
1935 }
1936
1937 #[test]
1938 fn test_change_key_column_data_type() {
1939 let schema = Arc::new(new_test_schema());
1940 let meta = TableMetaBuilder::empty()
1941 .schema(schema)
1942 .primary_key_indices(vec![0])
1943 .engine("engine")
1944 .next_column_id(3)
1945 .build()
1946 .unwrap();
1947
1948 let alter_kind = AlterKind::ModifyColumnTypes {
1950 columns: vec![ModifyColumnTypeRequest {
1951 column_name: "col1".to_string(),
1952 target_type: ConcreteDataType::string_datatype(),
1953 }],
1954 };
1955
1956 let err = meta
1957 .builder_with_alter_kind("my_table", &alter_kind)
1958 .err()
1959 .unwrap();
1960 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1961
1962 let alter_kind = AlterKind::ModifyColumnTypes {
1964 columns: vec![ModifyColumnTypeRequest {
1965 column_name: "ts".to_string(),
1966 target_type: ConcreteDataType::string_datatype(),
1967 }],
1968 };
1969
1970 let err = meta
1971 .builder_with_alter_kind("my_table", &alter_kind)
1972 .err()
1973 .unwrap();
1974 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1975 }
1976
1977 #[test]
1978 fn test_alloc_new_column() {
1979 let schema = Arc::new(new_test_schema());
1980 let mut meta = TableMetaBuilder::empty()
1981 .schema(schema)
1982 .primary_key_indices(vec![0])
1983 .engine("engine")
1984 .next_column_id(3)
1985 .build()
1986 .unwrap();
1987 assert_eq!(3, meta.next_column_id);
1988
1989 let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1990 let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1991
1992 assert_eq!(4, meta.next_column_id);
1993 assert_eq!(column_schema.name, desc.name);
1994 }
1995
1996 #[test]
1997 fn test_add_columns_with_location() {
1998 let schema = Arc::new(new_test_schema());
1999 let meta = TableMetaBuilder::empty()
2000 .schema(schema)
2001 .primary_key_indices(vec![0])
2002 .partition_key_indices(vec![0, 2])
2004 .engine("engine")
2005 .next_column_id(3)
2006 .build()
2007 .unwrap();
2008
2009 let new_meta = add_columns_to_meta_with_location(&meta);
2010 let names: Vec<String> = new_meta
2011 .schema
2012 .column_schemas()
2013 .iter()
2014 .map(|column_schema| column_schema.name.clone())
2015 .collect();
2016 assert_eq!(
2017 &[
2018 "my_tag_first", "col1", "ts", "yet_another_field_after_ts", "my_field_after_ts", "col2", ],
2025 &names[..]
2026 );
2027 assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2028 assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2029 assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2030 }
2031
2032 #[test]
2033 fn test_modify_column_fulltext_options() {
2034 let schema = Arc::new(new_test_schema());
2035 let meta = TableMetaBuilder::empty()
2036 .schema(schema)
2037 .primary_key_indices(vec![0])
2038 .engine("engine")
2039 .next_column_id(3)
2040 .build()
2041 .unwrap();
2042
2043 let alter_kind = AlterKind::SetIndexes {
2044 options: vec![SetIndexOption::Fulltext {
2045 column_name: "col1".to_string(),
2046 options: FulltextOptions::default(),
2047 }],
2048 };
2049 let err = meta
2050 .builder_with_alter_kind("my_table", &alter_kind)
2051 .err()
2052 .unwrap();
2053 assert_eq!(
2054 "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2055 err.to_string()
2056 );
2057
2058 let new_meta = add_columns_to_meta_with_location(&meta);
2060 let alter_kind = AlterKind::SetIndexes {
2061 options: vec![SetIndexOption::Fulltext {
2062 column_name: "my_tag_first".to_string(),
2063 options: FulltextOptions::new_unchecked(
2064 true,
2065 FulltextAnalyzer::Chinese,
2066 true,
2067 FulltextBackend::Bloom,
2068 1000,
2069 0.01,
2070 ),
2071 }],
2072 };
2073 let new_meta = new_meta
2074 .builder_with_alter_kind("my_table", &alter_kind)
2075 .unwrap()
2076 .build()
2077 .unwrap();
2078 let column_schema = new_meta
2079 .schema
2080 .column_schema_by_name("my_tag_first")
2081 .unwrap();
2082 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2083 assert!(fulltext_options.enable);
2084 assert_eq!(
2085 datatypes::schema::FulltextAnalyzer::Chinese,
2086 fulltext_options.analyzer
2087 );
2088 assert!(fulltext_options.case_sensitive);
2089
2090 let alter_kind = AlterKind::UnsetIndexes {
2091 options: vec![UnsetIndexOption::Fulltext {
2092 column_name: "my_tag_first".to_string(),
2093 }],
2094 };
2095 let new_meta = new_meta
2096 .builder_with_alter_kind("my_table", &alter_kind)
2097 .unwrap()
2098 .build()
2099 .unwrap();
2100 let column_schema = new_meta
2101 .schema
2102 .column_schema_by_name("my_tag_first")
2103 .unwrap();
2104 let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2105 assert!(!fulltext_options.enable);
2106 }
2107
2108 #[test]
2109 fn test_table_info_serde_compatibility() {
2110 let serialized = r#"{"ident":{"table_id":1024,"version":1},"name":"foo","desc":"my table","catalog_name":"greptime","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"col1","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"col2","data_type":{"Int32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":1,"version":123},"primary_key_indices":[0],"value_indices":[1,2],"engine":"mito","next_column_id":3,"options":{"write_buffer_size":null,"ttl":"1h","skip_wal":false,"extra_options":{}},"created_on":"-262143-01-01T00:00:00Z","updated_on":"+262142-12-31T23:59:59.999999999Z","partition_key_indices":[2],"column_ids":[0,1,2]},"table_type":"Base"}"#;
2143
2144 let actual: TableInfo = serde_json::from_str(serialized).unwrap();
2145 let expected = TableInfo {
2146 ident: TableIdent {
2147 table_id: 1024,
2148 version: 1,
2149 },
2150 name: "foo".to_string(),
2151 desc: Some("my table".to_string()),
2152 catalog_name: "greptime".to_string(),
2153 schema_name: "public".to_string(),
2154 meta: TableMeta {
2155 schema: Arc::new(new_test_schema()),
2156 primary_key_indices: vec![0],
2157 value_indices: vec![1, 2],
2158 engine: "mito".to_string(),
2159 next_column_id: 3,
2160 options: TableOptions {
2161 ttl: Some(common_time::TimeToLive::Duration(
2162 std::time::Duration::from_secs(3600),
2163 )),
2164 ..Default::default()
2165 },
2166 created_on: DateTime::<Utc>::MIN_UTC,
2167 updated_on: DateTime::<Utc>::MAX_UTC,
2168 partition_key_indices: vec![2],
2169 column_ids: vec![0, 1, 2],
2170 },
2171 table_type: TableType::Base,
2172 };
2173 assert_eq!(actual, expected);
2174 }
2175}