table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105    fn from(t: TableType) -> datafusion::datasource::TableType {
106        match t {
107            TableType::Base => datafusion::datasource::TableType::Base,
108            TableType::View => datafusion::datasource::TableType::View,
109            TableType::Temporary => datafusion::datasource::TableType::Temporary,
110        }
111    }
112}
113
114/// Identifier of the table.
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117    /// Unique id of this table.
118    pub table_id: TableId,
119    /// Version of the table, bumped when metadata (such as schema) of the table
120    /// being changed.
121    pub version: TableVersion,
122}
123
124/// The table metadata.
125///
126/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
127#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130    pub schema: SchemaRef,
131    /// The indices of columns in primary key. Note that the index of timestamp column
132    /// is not included in these indices.
133    pub primary_key_indices: Vec<usize>,
134    #[builder(default = "self.default_value_indices()?")]
135    pub value_indices: Vec<usize>,
136    #[builder(default, setter(into))]
137    pub engine: String,
138    #[builder(default, setter(into))]
139    pub region_numbers: Vec<u32>,
140    pub next_column_id: ColumnId,
141    /// Table options.
142    #[builder(default)]
143    pub options: TableOptions,
144    #[builder(default = "Utc::now()")]
145    pub created_on: DateTime<Utc>,
146    #[builder(default = "Vec::new()")]
147    pub partition_key_indices: Vec<usize>,
148    #[builder(default = "Vec::new()")]
149    pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
154    #[cfg(any(test, feature = "testing"))]
155    pub fn empty() -> Self {
156        Self {
157            schema: None,
158            primary_key_indices: None,
159            value_indices: None,
160            engine: None,
161            region_numbers: None,
162            next_column_id: None,
163            options: None,
164            created_on: None,
165            partition_key_indices: None,
166            column_ids: None,
167        }
168    }
169}
170
171impl TableMetaBuilder {
172    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173        match (&self.primary_key_indices, &self.schema) {
174            (Some(v), Some(schema)) => {
175                let column_schemas = schema.column_schemas();
176                Ok((0..column_schemas.len())
177                    .filter(|idx| !v.contains(idx))
178                    .collect())
179            }
180            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181        }
182    }
183
184    pub fn new_external_table() -> Self {
185        Self {
186            schema: None,
187            primary_key_indices: Some(Vec::new()),
188            value_indices: Some(Vec::new()),
189            engine: None,
190            region_numbers: Some(Vec::new()),
191            next_column_id: Some(0),
192            options: None,
193            created_on: None,
194            partition_key_indices: None,
195            column_ids: None,
196        }
197    }
198}
199
200/// The result after splitting requests by column location info.
201struct SplitResult<'a> {
202    /// column requests should be added at first place.
203    columns_at_first: Vec<&'a AddColumnRequest>,
204    /// column requests should be added after already exist columns.
205    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
206    /// column requests should be added at last place.
207    columns_at_last: Vec<&'a AddColumnRequest>,
208    /// all column names should be added.
209    column_names: Vec<String>,
210}
211
212impl TableMeta {
213    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
214        let columns_schemas = &self.schema.column_schemas();
215        self.primary_key_indices
216            .iter()
217            .map(|idx| &columns_schemas[*idx].name)
218    }
219
220    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
221        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
222        let columns_schemas = self.schema.column_schemas();
223        let primary_key_indices = &self.primary_key_indices;
224        columns_schemas
225            .iter()
226            .enumerate()
227            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
228            .map(|(_, cs)| &cs.name)
229    }
230
231    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
232        let columns_schemas = &self.schema.column_schemas();
233        self.partition_key_indices
234            .iter()
235            .map(|idx| &columns_schemas[*idx].name)
236    }
237
238    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
239    ///
240    /// The returned builder would derive the next column id of this meta.
241    pub fn builder_with_alter_kind(
242        &self,
243        table_name: &str,
244        alter_kind: &AlterKind,
245    ) -> Result<TableMetaBuilder> {
246        match alter_kind {
247            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
248            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
249            AlterKind::ModifyColumnTypes { columns } => {
250                self.modify_column_types(table_name, columns)
251            }
252            // No need to rebuild table meta when renaming tables.
253            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
254            AlterKind::SetTableOptions { options } => self.set_table_options(options),
255            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
256            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
257            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
258            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
259            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
260        }
261    }
262
263    /// Creates a [TableMetaBuilder] with modified table options.
264    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
265        let mut new_options = self.options.clone();
266
267        for request in requests {
268            match request {
269                SetRegionOption::Ttl(new_ttl) => {
270                    new_options.ttl = *new_ttl;
271                }
272                SetRegionOption::Twsc(key, value) => {
273                    if !value.is_empty() {
274                        new_options.extra_options.insert(key.clone(), value.clone());
275                        // Ensure node restart correctly.
276                        new_options.extra_options.insert(
277                            COMPACTION_TYPE.to_string(),
278                            COMPACTION_TYPE_TWCS.to_string(),
279                        );
280                    } else {
281                        // Invalidate the previous change option if an empty value has been set.
282                        new_options.extra_options.remove(key.as_str());
283                    }
284                }
285            }
286        }
287        let mut builder = self.new_meta_builder();
288        builder.options(new_options);
289
290        Ok(builder)
291    }
292
293    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
294        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
295        self.set_table_options(&requests)
296    }
297
298    fn set_indexes(
299        &self,
300        table_name: &str,
301        requests: &[SetIndexOption],
302    ) -> Result<TableMetaBuilder> {
303        let table_schema = &self.schema;
304        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
305        for request in requests {
306            let column_name = request.column_name();
307            table_schema
308                .column_index_by_name(column_name)
309                .with_context(|| error::ColumnNotExistsSnafu {
310                    column_name,
311                    table_name,
312                })?;
313            set_index_options
314                .entry(column_name)
315                .or_default()
316                .push(request);
317        }
318
319        let mut meta_builder = self.new_meta_builder();
320        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
321        for mut column in table_schema.column_schemas().iter().cloned() {
322            if let Some(request) = set_index_options.get(column.name.as_str()) {
323                for request in request {
324                    self.set_index(&mut column, request)?;
325                }
326            }
327            columns.push(column);
328        }
329
330        let mut builder = SchemaBuilder::try_from_columns(columns)
331            .with_context(|_| error::SchemaBuildSnafu {
332                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
333            })?
334            .version(table_schema.version() + 1);
335
336        for (k, v) in table_schema.metadata().iter() {
337            builder = builder.add_metadata(k, v);
338        }
339
340        let new_schema = builder.build().with_context(|_| {
341            let column_names = requests
342                .iter()
343                .map(|request| request.column_name())
344                .collect::<Vec<_>>();
345            error::SchemaBuildSnafu {
346                msg: format!(
347                    "Table {table_name} cannot set index options with columns {column_names:?}",
348                ),
349            }
350        })?;
351        let _ = meta_builder
352            .schema(Arc::new(new_schema))
353            .primary_key_indices(self.primary_key_indices.clone());
354
355        Ok(meta_builder)
356    }
357
358    fn unset_indexes(
359        &self,
360        table_name: &str,
361        requests: &[UnsetIndexOption],
362    ) -> Result<TableMetaBuilder> {
363        let table_schema = &self.schema;
364        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
365        for request in requests {
366            let column_name = request.column_name();
367            table_schema
368                .column_index_by_name(column_name)
369                .with_context(|| error::ColumnNotExistsSnafu {
370                    column_name,
371                    table_name,
372                })?;
373            set_index_options
374                .entry(column_name)
375                .or_default()
376                .push(request);
377        }
378
379        let mut meta_builder = self.new_meta_builder();
380        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
381        for mut column in table_schema.column_schemas().iter().cloned() {
382            if let Some(request) = set_index_options.get(column.name.as_str()) {
383                for request in request {
384                    self.unset_index(&mut column, request)?;
385                }
386            }
387            columns.push(column);
388        }
389
390        let mut builder = SchemaBuilder::try_from_columns(columns)
391            .with_context(|_| error::SchemaBuildSnafu {
392                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
393            })?
394            .version(table_schema.version() + 1);
395
396        for (k, v) in table_schema.metadata().iter() {
397            builder = builder.add_metadata(k, v);
398        }
399
400        let new_schema = builder.build().with_context(|_| {
401            let column_names = requests
402                .iter()
403                .map(|request| request.column_name())
404                .collect::<Vec<_>>();
405            error::SchemaBuildSnafu {
406                msg: format!(
407                    "Table {table_name} cannot set index options with columns {column_names:?}",
408                ),
409            }
410        })?;
411        let _ = meta_builder
412            .schema(Arc::new(new_schema))
413            .primary_key_indices(self.primary_key_indices.clone());
414
415        Ok(meta_builder)
416    }
417
418    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
419        match request {
420            SetIndexOption::Fulltext {
421                column_name,
422                options,
423            } => {
424                ensure!(
425                    column_schema.data_type.is_string(),
426                    error::InvalidColumnOptionSnafu {
427                        column_name,
428                        msg: "FULLTEXT index only supports string type",
429                    }
430                );
431
432                let current_fulltext_options = column_schema
433                    .fulltext_options()
434                    .context(error::SetFulltextOptionsSnafu { column_name })?;
435                set_column_fulltext_options(
436                    column_schema,
437                    column_name,
438                    options,
439                    current_fulltext_options,
440                )?;
441            }
442            SetIndexOption::Inverted { column_name } => {
443                debug_assert_eq!(column_schema.name, *column_name);
444                column_schema.set_inverted_index(true);
445            }
446            SetIndexOption::Skipping {
447                column_name,
448                options,
449            } => {
450                set_column_skipping_index_options(column_schema, column_name, options)?;
451            }
452        }
453
454        Ok(())
455    }
456
457    fn unset_index(
458        &self,
459        column_schema: &mut ColumnSchema,
460        request: &UnsetIndexOption,
461    ) -> Result<()> {
462        match request {
463            UnsetIndexOption::Fulltext { column_name } => {
464                let current_fulltext_options = column_schema
465                    .fulltext_options()
466                    .context(error::SetFulltextOptionsSnafu { column_name })?;
467                unset_column_fulltext_options(
468                    column_schema,
469                    column_name,
470                    current_fulltext_options.clone(),
471                )?
472            }
473            UnsetIndexOption::Inverted { .. } => {
474                column_schema.set_inverted_index(false);
475            }
476            UnsetIndexOption::Skipping { column_name } => {
477                unset_column_skipping_index_options(column_schema, column_name)?;
478            }
479        }
480
481        Ok(())
482    }
483
484    // TODO(yingwen): Remove this.
485    /// Allocate a new column for the table.
486    ///
487    /// This method would bump the `next_column_id` of the meta.
488    pub fn alloc_new_column(
489        &mut self,
490        table_name: &str,
491        new_column: &ColumnSchema,
492    ) -> Result<ColumnDescriptor> {
493        let desc = ColumnDescriptorBuilder::new(
494            self.next_column_id as ColumnId,
495            &new_column.name,
496            new_column.data_type.clone(),
497        )
498        .is_nullable(new_column.is_nullable())
499        .default_constraint(new_column.default_constraint().cloned())
500        .build()
501        .context(error::BuildColumnDescriptorSnafu {
502            table_name,
503            column_name: &new_column.name,
504        })?;
505
506        // Bump next column id.
507        self.next_column_id += 1;
508
509        Ok(desc)
510    }
511
512    /// Create a [`TableMetaBuilder`] from the current TableMeta.
513    fn new_meta_builder(&self) -> TableMetaBuilder {
514        let mut builder = TableMetaBuilder::from(self);
515        // Manually remove value_indices.
516        builder.value_indices = None;
517        builder
518    }
519
520    // TODO(yingwen): Tests add if not exists.
521    fn add_columns(
522        &self,
523        table_name: &str,
524        requests: &[AddColumnRequest],
525    ) -> Result<TableMetaBuilder> {
526        let table_schema = &self.schema;
527        let mut meta_builder = self.new_meta_builder();
528        let original_primary_key_indices: HashSet<&usize> =
529            self.primary_key_indices.iter().collect();
530
531        let mut names = HashSet::with_capacity(requests.len());
532        let mut new_columns = Vec::with_capacity(requests.len());
533        for col_to_add in requests {
534            if let Some(column_schema) =
535                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
536            {
537                // If the column already exists.
538                ensure!(
539                    col_to_add.add_if_not_exists,
540                    error::ColumnExistsSnafu {
541                        table_name,
542                        column_name: &col_to_add.column_schema.name
543                    },
544                );
545
546                // Checks if the type is the same
547                ensure!(
548                    column_schema.data_type == col_to_add.column_schema.data_type,
549                    error::InvalidAlterRequestSnafu {
550                        table: table_name,
551                        err: format!(
552                            "column {} already exists with different type {:?}",
553                            col_to_add.column_schema.name, column_schema.data_type,
554                        ),
555                    }
556                );
557            } else {
558                // A new column.
559                // Ensures we only add a column once.
560                ensure!(
561                    names.insert(&col_to_add.column_schema.name),
562                    error::InvalidAlterRequestSnafu {
563                        table: table_name,
564                        err: format!(
565                            "add column {} more than once",
566                            col_to_add.column_schema.name
567                        ),
568                    }
569                );
570
571                ensure!(
572                    col_to_add.column_schema.is_nullable()
573                        || col_to_add.column_schema.default_constraint().is_some(),
574                    error::InvalidAlterRequestSnafu {
575                        table: table_name,
576                        err: format!(
577                            "no default value for column {}",
578                            col_to_add.column_schema.name
579                        ),
580                    },
581                );
582
583                new_columns.push(col_to_add.clone());
584            }
585        }
586        let requests = &new_columns[..];
587
588        let SplitResult {
589            columns_at_first,
590            columns_at_after,
591            columns_at_last,
592            column_names,
593        } = self.split_requests_by_column_location(table_name, requests)?;
594        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
595        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
596        // add new columns with FIRST, and in reverse order of requests.
597        columns_at_first.iter().rev().for_each(|request| {
598            if request.is_key {
599                // If a key column is added, we also need to store its index in primary_key_indices.
600                primary_key_indices.push(columns.len());
601            }
602            columns.push(request.column_schema.clone());
603        });
604        // add existed columns in original order and handle new columns with AFTER.
605        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
606            if original_primary_key_indices.contains(&index) {
607                primary_key_indices.push(columns.len());
608            }
609            columns.push(column_schema.clone());
610            if let Some(requests) = columns_at_after.get(&column_schema.name) {
611                requests.iter().rev().for_each(|request| {
612                    if request.is_key {
613                        // If a key column is added, we also need to store its index in primary_key_indices.
614                        primary_key_indices.push(columns.len());
615                    }
616                    columns.push(request.column_schema.clone());
617                });
618            }
619        }
620        // add new columns without location info to last.
621        columns_at_last.iter().for_each(|request| {
622            if request.is_key {
623                // If a key column is added, we also need to store its index in primary_key_indices.
624                primary_key_indices.push(columns.len());
625            }
626            columns.push(request.column_schema.clone());
627        });
628
629        let mut builder = SchemaBuilder::try_from(columns)
630            .with_context(|_| error::SchemaBuildSnafu {
631                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
632            })?
633            // Also bump the schema version.
634            .version(table_schema.version() + 1);
635        for (k, v) in table_schema.metadata().iter() {
636            builder = builder.add_metadata(k, v);
637        }
638        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
639            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
640        })?;
641
642        let partition_key_indices = self
643            .partition_key_indices
644            .iter()
645            .map(|idx| table_schema.column_name_by_index(*idx))
646            // This unwrap is safe since we only add new columns.
647            .map(|name| new_schema.column_index_by_name(name).unwrap())
648            .collect();
649
650        // value_indices would be generated automatically.
651        let _ = meta_builder
652            .schema(Arc::new(new_schema))
653            .primary_key_indices(primary_key_indices)
654            .partition_key_indices(partition_key_indices);
655
656        Ok(meta_builder)
657    }
658
659    fn remove_columns(
660        &self,
661        table_name: &str,
662        column_names: &[String],
663    ) -> Result<TableMetaBuilder> {
664        let table_schema = &self.schema;
665        let column_names: HashSet<_> = column_names.iter().collect();
666        let mut meta_builder = self.new_meta_builder();
667
668        let timestamp_index = table_schema.timestamp_index();
669        // Check whether columns are existing and not in primary key index.
670        for column_name in &column_names {
671            if let Some(index) = table_schema.column_index_by_name(column_name) {
672                // This is a linear search, but since there won't be too much columns, the performance should
673                // be acceptable.
674                ensure!(
675                    !self.primary_key_indices.contains(&index),
676                    error::RemoveColumnInIndexSnafu {
677                        column_name: *column_name,
678                        table_name,
679                    }
680                );
681
682                ensure!(
683                    !self.partition_key_indices.contains(&index),
684                    error::RemovePartitionColumnSnafu {
685                        column_name: *column_name,
686                        table_name,
687                    }
688                );
689
690                if let Some(ts_index) = timestamp_index {
691                    // Not allowed to remove column in timestamp index.
692                    ensure!(
693                        index != ts_index,
694                        error::RemoveColumnInIndexSnafu {
695                            column_name: table_schema.column_name_by_index(ts_index),
696                            table_name,
697                        }
698                    );
699                }
700            } else {
701                return error::ColumnNotExistsSnafu {
702                    column_name: *column_name,
703                    table_name,
704                }
705                .fail()?;
706            }
707        }
708
709        // Collect columns after removal.
710        let columns: Vec<_> = table_schema
711            .column_schemas()
712            .iter()
713            .filter(|column_schema| !column_names.contains(&column_schema.name))
714            .cloned()
715            .collect();
716
717        let mut builder = SchemaBuilder::try_from_columns(columns)
718            .with_context(|_| error::SchemaBuildSnafu {
719                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
720            })?
721            // Also bump the schema version.
722            .version(table_schema.version() + 1);
723        for (k, v) in table_schema.metadata().iter() {
724            builder = builder.add_metadata(k, v);
725        }
726        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
727            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
728        })?;
729
730        // Rebuild the indices of primary key columns.
731        let primary_key_indices = self
732            .primary_key_indices
733            .iter()
734            .map(|idx| table_schema.column_name_by_index(*idx))
735            // This unwrap is safe since we don't allow removing a primary key column.
736            .map(|name| new_schema.column_index_by_name(name).unwrap())
737            .collect();
738
739        let partition_key_indices = self
740            .partition_key_indices
741            .iter()
742            .map(|idx| table_schema.column_name_by_index(*idx))
743            // This unwrap is safe since we don't allow removing a partition key column.
744            .map(|name| new_schema.column_index_by_name(name).unwrap())
745            .collect();
746
747        let _ = meta_builder
748            .schema(Arc::new(new_schema))
749            .primary_key_indices(primary_key_indices)
750            .partition_key_indices(partition_key_indices);
751
752        Ok(meta_builder)
753    }
754
755    fn modify_column_types(
756        &self,
757        table_name: &str,
758        requests: &[ModifyColumnTypeRequest],
759    ) -> Result<TableMetaBuilder> {
760        let table_schema = &self.schema;
761        let mut meta_builder = self.new_meta_builder();
762
763        let mut modify_column_types = HashMap::with_capacity(requests.len());
764        let timestamp_index = table_schema.timestamp_index();
765
766        for col_to_change in requests {
767            let change_column_name = &col_to_change.column_name;
768
769            let index = table_schema
770                .column_index_by_name(change_column_name)
771                .with_context(|| error::ColumnNotExistsSnafu {
772                    column_name: change_column_name,
773                    table_name,
774                })?;
775
776            let column = &table_schema.column_schemas()[index];
777
778            ensure!(
779                !self.primary_key_indices.contains(&index),
780                error::InvalidAlterRequestSnafu {
781                    table: table_name,
782                    err: format!(
783                        "Not allowed to change primary key index column '{}'",
784                        column.name
785                    )
786                }
787            );
788
789            if let Some(ts_index) = timestamp_index {
790                // Not allowed to change column datatype in timestamp index.
791                ensure!(
792                    index != ts_index,
793                    error::InvalidAlterRequestSnafu {
794                        table: table_name,
795                        err: format!(
796                            "Not allowed to change timestamp index column '{}' datatype",
797                            column.name
798                        )
799                    }
800                );
801            }
802
803            ensure!(
804                modify_column_types
805                    .insert(&col_to_change.column_name, col_to_change)
806                    .is_none(),
807                error::InvalidAlterRequestSnafu {
808                    table: table_name,
809                    err: format!(
810                        "change column datatype {} more than once",
811                        col_to_change.column_name
812                    ),
813                }
814            );
815
816            ensure!(
817                column
818                    .data_type
819                    .can_arrow_type_cast_to(&col_to_change.target_type),
820                error::InvalidAlterRequestSnafu {
821                    table: table_name,
822                    err: format!(
823                        "column '{}' cannot be cast automatically to type '{}'",
824                        col_to_change.column_name, col_to_change.target_type,
825                    ),
826                }
827            );
828
829            ensure!(
830                column.is_nullable(),
831                error::InvalidAlterRequestSnafu {
832                    table: table_name,
833                    err: format!(
834                        "column '{}' must be nullable to ensure safe conversion.",
835                        col_to_change.column_name,
836                    ),
837                }
838            );
839        }
840        // Collect columns after changed.
841
842        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
843        for mut column in table_schema.column_schemas().iter().cloned() {
844            if let Some(change_column) = modify_column_types.get(&column.name) {
845                column.data_type = change_column.target_type.clone();
846                let new_default = if let Some(default_value) = column.default_constraint() {
847                    Some(
848                        default_value
849                            .cast_to_datatype(&change_column.target_type)
850                            .with_context(|_| error::CastDefaultValueSnafu {
851                                reason: format!(
852                                    "Failed to cast default value from {:?} to type {:?}",
853                                    default_value, &change_column.target_type
854                                ),
855                            })?,
856                    )
857                } else {
858                    None
859                };
860                column = column
861                    .clone()
862                    .with_default_constraint(new_default.clone())
863                    .with_context(|_| error::CastDefaultValueSnafu {
864                        reason: format!("Failed to set new default: {:?}", new_default),
865                    })?;
866            }
867            columns.push(column)
868        }
869
870        let mut builder = SchemaBuilder::try_from_columns(columns)
871            .with_context(|_| error::SchemaBuildSnafu {
872                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
873            })?
874            // Also bump the schema version.
875            .version(table_schema.version() + 1);
876        for (k, v) in table_schema.metadata().iter() {
877            builder = builder.add_metadata(k, v);
878        }
879        let new_schema = builder.build().with_context(|_| {
880            let column_names: Vec<_> = requests
881                .iter()
882                .map(|request| &request.column_name)
883                .collect();
884
885            error::SchemaBuildSnafu {
886                msg: format!(
887                    "Table {table_name} cannot change datatype with columns {column_names:?}"
888                ),
889            }
890        })?;
891
892        let _ = meta_builder
893            .schema(Arc::new(new_schema))
894            .primary_key_indices(self.primary_key_indices.clone());
895
896        Ok(meta_builder)
897    }
898
899    /// Split requests into different groups using column location info.
900    fn split_requests_by_column_location<'a>(
901        &self,
902        table_name: &str,
903        requests: &'a [AddColumnRequest],
904    ) -> Result<SplitResult<'a>> {
905        let table_schema = &self.schema;
906        let mut columns_at_first = Vec::new();
907        let mut columns_at_after = HashMap::new();
908        let mut columns_at_last = Vec::new();
909        let mut column_names = Vec::with_capacity(requests.len());
910        for request in requests {
911            // Check whether columns to add are already existing.
912            let column_name = &request.column_schema.name;
913            column_names.push(column_name.clone());
914            ensure!(
915                table_schema.column_schema_by_name(column_name).is_none(),
916                error::ColumnExistsSnafu {
917                    column_name,
918                    table_name,
919                }
920            );
921            match request.location.as_ref() {
922                Some(AddColumnLocation::First) => {
923                    columns_at_first.push(request);
924                }
925                Some(AddColumnLocation::After { column_name }) => {
926                    ensure!(
927                        table_schema.column_schema_by_name(column_name).is_some(),
928                        error::ColumnNotExistsSnafu {
929                            column_name,
930                            table_name,
931                        }
932                    );
933                    columns_at_after
934                        .entry(column_name.clone())
935                        .or_insert(Vec::new())
936                        .push(request);
937                }
938                None => {
939                    columns_at_last.push(request);
940                }
941            }
942        }
943        Ok(SplitResult {
944            columns_at_first,
945            columns_at_after,
946            columns_at_last,
947            column_names,
948        })
949    }
950
951    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
952        let table_schema = &self.schema;
953        let mut meta_builder = self.new_meta_builder();
954        let mut columns = Vec::with_capacity(table_schema.num_columns());
955        for column_schema in table_schema.column_schemas() {
956            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
957                // Drop default constraint.
958                ensure!(
959                    column_schema.default_constraint().is_some(),
960                    error::InvalidAlterRequestSnafu {
961                        table: table_name,
962                        err: format!("column {name} does not have a default value"),
963                    }
964                );
965                if !column_schema.is_nullable() {
966                    return error::InvalidAlterRequestSnafu {
967                        table: table_name,
968                        err: format!(
969                            "column {name} is not nullable and `default` cannot be dropped",
970                        ),
971                    }
972                    .fail();
973                }
974                let new_column_schema = column_schema.clone();
975                let new_column_schema = new_column_schema
976                    .with_default_constraint(None)
977                    .with_context(|_| error::SchemaBuildSnafu {
978                        msg: format!("Table {table_name} cannot drop default values"),
979                    })?;
980                columns.push(new_column_schema);
981            } else {
982                columns.push(column_schema.clone());
983            }
984        }
985
986        let mut builder = SchemaBuilder::try_from_columns(columns)
987            .with_context(|_| error::SchemaBuildSnafu {
988                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
989            })?
990            // Also bump the schema version.
991            .version(table_schema.version() + 1);
992        for (k, v) in table_schema.metadata().iter() {
993            builder = builder.add_metadata(k, v);
994        }
995        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
996            msg: format!("Table {table_name} cannot drop default values"),
997        })?;
998
999        let _ = meta_builder.schema(Arc::new(new_schema));
1000
1001        Ok(meta_builder)
1002    }
1003
1004    fn set_defaults(
1005        &self,
1006        table_name: &str,
1007        set_defaults: &[SetDefaultRequest],
1008    ) -> Result<TableMetaBuilder> {
1009        let table_schema = &self.schema;
1010        let mut meta_builder = self.new_meta_builder();
1011        let mut columns = Vec::with_capacity(table_schema.num_columns());
1012        for column_schema in table_schema.column_schemas() {
1013            if let Some(set_default) = set_defaults
1014                .iter()
1015                .find(|s| s.column_name == column_schema.name)
1016            {
1017                let new_column_schema = column_schema.clone();
1018                let new_column_schema = new_column_schema
1019                    .with_default_constraint(set_default.default_constraint.clone())
1020                    .with_context(|_| error::SchemaBuildSnafu {
1021                        msg: format!("Table {table_name} cannot set default values"),
1022                    })?;
1023                columns.push(new_column_schema);
1024            } else {
1025                columns.push(column_schema.clone());
1026            }
1027        }
1028
1029        let mut builder = SchemaBuilder::try_from_columns(columns)
1030            .with_context(|_| error::SchemaBuildSnafu {
1031                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1032            })?
1033            // Also bump the schema version.
1034            .version(table_schema.version() + 1);
1035        for (k, v) in table_schema.metadata().iter() {
1036            builder = builder.add_metadata(k, v);
1037        }
1038        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1039            msg: format!("Table {table_name} cannot set default values"),
1040        })?;
1041
1042        let _ = meta_builder.schema(Arc::new(new_schema));
1043
1044        Ok(meta_builder)
1045    }
1046}
1047
1048#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1049#[builder(pattern = "owned")]
1050pub struct TableInfo {
1051    /// Id and version of the table.
1052    #[builder(default, setter(into))]
1053    pub ident: TableIdent,
1054    /// Name of the table.
1055    #[builder(setter(into))]
1056    pub name: String,
1057    /// Comment of the table.
1058    #[builder(default, setter(into))]
1059    pub desc: Option<String>,
1060    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1061    pub catalog_name: String,
1062    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1063    pub schema_name: String,
1064    pub meta: TableMeta,
1065    #[builder(default = "TableType::Base")]
1066    pub table_type: TableType,
1067}
1068
1069pub type TableInfoRef = Arc<TableInfo>;
1070
1071impl TableInfo {
1072    pub fn table_id(&self) -> TableId {
1073        self.ident.table_id
1074    }
1075
1076    pub fn region_ids(&self) -> Vec<RegionId> {
1077        self.meta
1078            .region_numbers
1079            .iter()
1080            .map(|id| RegionId::new(self.table_id(), *id))
1081            .collect()
1082    }
1083    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1084    pub fn full_table_name(&self) -> String {
1085        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1086    }
1087
1088    pub fn get_db_string(&self) -> String {
1089        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1090    }
1091
1092    /// Returns true when the table is the metric engine's physical table.
1093    pub fn is_physical_table(&self) -> bool {
1094        self.meta
1095            .options
1096            .extra_options
1097            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1098    }
1099
1100    /// Return true if the table's TTL is `instant`.
1101    pub fn is_ttl_instant_table(&self) -> bool {
1102        self.meta
1103            .options
1104            .ttl
1105            .map(|t| t.is_instant())
1106            .unwrap_or(false)
1107    }
1108}
1109
1110impl TableInfoBuilder {
1111    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1112        Self {
1113            name: Some(name.into()),
1114            meta: Some(meta),
1115            ..Default::default()
1116        }
1117    }
1118
1119    pub fn table_id(mut self, id: TableId) -> Self {
1120        let ident = self.ident.get_or_insert_with(TableIdent::default);
1121        ident.table_id = id;
1122        self
1123    }
1124
1125    pub fn table_version(mut self, version: TableVersion) -> Self {
1126        let ident = self.ident.get_or_insert_with(TableIdent::default);
1127        ident.version = version;
1128        self
1129    }
1130}
1131
1132impl TableIdent {
1133    pub fn new(table_id: TableId) -> Self {
1134        Self {
1135            table_id,
1136            version: 0,
1137        }
1138    }
1139}
1140
1141impl From<TableId> for TableIdent {
1142    fn from(table_id: TableId) -> Self {
1143        Self::new(table_id)
1144    }
1145}
1146
1147/// Struct used to serialize and deserialize [`TableMeta`].
1148#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1149pub struct RawTableMeta {
1150    pub schema: RawSchema,
1151    /// The indices of columns in primary key. Note that the index of timestamp column
1152    /// is not included. Order matters to this array.
1153    pub primary_key_indices: Vec<usize>,
1154    ///  The indices of columns in value. The index of timestamp column is included.
1155    /// Order doesn't matter to this array.
1156    pub value_indices: Vec<usize>,
1157    /// Engine type of this table. Usually in small case.
1158    pub engine: String,
1159    /// Next column id of a new column.
1160    /// It's used to ensure all columns with the same name across all regions have the same column id.
1161    pub next_column_id: ColumnId,
1162    pub region_numbers: Vec<u32>,
1163    pub options: TableOptions,
1164    pub created_on: DateTime<Utc>,
1165    /// Order doesn't matter to this array.
1166    #[serde(default)]
1167    pub partition_key_indices: Vec<usize>,
1168    /// Map of column name to column id.
1169    /// Note: This field may be empty for older versions that did not include this field.
1170    #[serde(default)]
1171    pub column_ids: Vec<ColumnId>,
1172}
1173
1174impl From<TableMeta> for RawTableMeta {
1175    fn from(meta: TableMeta) -> RawTableMeta {
1176        RawTableMeta {
1177            schema: RawSchema::from(&*meta.schema),
1178            primary_key_indices: meta.primary_key_indices,
1179            value_indices: meta.value_indices,
1180            engine: meta.engine,
1181            next_column_id: meta.next_column_id,
1182            region_numbers: meta.region_numbers,
1183            options: meta.options,
1184            created_on: meta.created_on,
1185            partition_key_indices: meta.partition_key_indices,
1186            column_ids: meta.column_ids,
1187        }
1188    }
1189}
1190
1191impl TryFrom<RawTableMeta> for TableMeta {
1192    type Error = ConvertError;
1193
1194    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1195        Ok(TableMeta {
1196            schema: Arc::new(Schema::try_from(raw.schema)?),
1197            primary_key_indices: raw.primary_key_indices,
1198            value_indices: raw.value_indices,
1199            engine: raw.engine,
1200            region_numbers: raw.region_numbers,
1201            next_column_id: raw.next_column_id,
1202            options: raw.options,
1203            created_on: raw.created_on,
1204            partition_key_indices: raw.partition_key_indices,
1205            column_ids: raw.column_ids,
1206        })
1207    }
1208}
1209
1210/// Struct used to serialize and deserialize [`TableInfo`].
1211#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1212pub struct RawTableInfo {
1213    pub ident: TableIdent,
1214    pub name: String,
1215    pub desc: Option<String>,
1216    pub catalog_name: String,
1217    pub schema_name: String,
1218    pub meta: RawTableMeta,
1219    pub table_type: TableType,
1220}
1221
1222impl RawTableInfo {
1223    /// Returns the map of column name to column id.
1224    ///
1225    /// Note: This method may return an empty map for older versions that did not include this field.
1226    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1227        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1228            None
1229        } else {
1230            Some(
1231                self.meta
1232                    .column_ids
1233                    .iter()
1234                    .enumerate()
1235                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1236                    .collect(),
1237            )
1238        }
1239    }
1240
1241    /// Sort the columns in [RawTableInfo], logical tables require it.
1242    pub fn sort_columns(&mut self) {
1243        let column_schemas = &self.meta.schema.column_schemas;
1244        let primary_keys = self
1245            .meta
1246            .primary_key_indices
1247            .iter()
1248            .map(|index| column_schemas[*index].name.clone())
1249            .collect::<HashSet<_>>();
1250
1251        let name_to_ids = self.name_to_ids().unwrap_or_default();
1252        self.meta
1253            .schema
1254            .column_schemas
1255            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1256
1257        // Compute new indices of sorted columns
1258        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1259        let mut timestamp_index = None;
1260        let mut value_indices =
1261            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1262        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1263        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1264            if primary_keys.contains(&column_schema.name) {
1265                primary_key_indices.push(index);
1266            } else if column_schema.is_time_index() {
1267                value_indices.push(index);
1268                timestamp_index = Some(index);
1269            } else {
1270                value_indices.push(index);
1271            }
1272            if let Some(id) = name_to_ids.get(&column_schema.name) {
1273                column_ids.push(*id);
1274            }
1275        }
1276
1277        // Overwrite table meta
1278        self.meta.schema.timestamp_index = timestamp_index;
1279        self.meta.primary_key_indices = primary_key_indices;
1280        self.meta.value_indices = value_indices;
1281        self.meta.column_ids = column_ids;
1282    }
1283
1284    /// Extracts region options from table info.
1285    ///
1286    /// All "region options" are actually a copy of table options for redundancy.
1287    pub fn to_region_options(&self) -> HashMap<String, String> {
1288        HashMap::from(&self.meta.options)
1289    }
1290
1291    /// Returns the table reference.
1292    pub fn table_ref(&self) -> TableReference<'_> {
1293        TableReference::full(
1294            self.catalog_name.as_str(),
1295            self.schema_name.as_str(),
1296            self.name.as_str(),
1297        )
1298    }
1299}
1300
1301impl From<TableInfo> for RawTableInfo {
1302    fn from(info: TableInfo) -> RawTableInfo {
1303        RawTableInfo {
1304            ident: info.ident,
1305            name: info.name,
1306            desc: info.desc,
1307            catalog_name: info.catalog_name,
1308            schema_name: info.schema_name,
1309            meta: RawTableMeta::from(info.meta),
1310            table_type: info.table_type,
1311        }
1312    }
1313}
1314
1315impl TryFrom<RawTableInfo> for TableInfo {
1316    type Error = ConvertError;
1317
1318    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1319        Ok(TableInfo {
1320            ident: raw.ident,
1321            name: raw.name,
1322            desc: raw.desc,
1323            catalog_name: raw.catalog_name,
1324            schema_name: raw.schema_name,
1325            meta: TableMeta::try_from(raw.meta)?,
1326            table_type: raw.table_type,
1327        })
1328    }
1329}
1330
1331/// Set column fulltext options if it passed the validation.
1332///
1333/// Options allowed to modify:
1334/// * backend
1335///
1336/// Options not allowed to modify:
1337/// * analyzer
1338/// * case_sensitive
1339fn set_column_fulltext_options(
1340    column_schema: &mut ColumnSchema,
1341    column_name: &str,
1342    options: &FulltextOptions,
1343    current_options: Option<FulltextOptions>,
1344) -> Result<()> {
1345    if let Some(current_options) = current_options {
1346        ensure!(
1347            current_options.analyzer == options.analyzer
1348                && current_options.case_sensitive == options.case_sensitive,
1349            error::InvalidColumnOptionSnafu {
1350                column_name,
1351                msg: format!(
1352                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1353                    current_options.analyzer, current_options.case_sensitive
1354                ),
1355            }
1356        );
1357    }
1358
1359    column_schema
1360        .set_fulltext_options(options)
1361        .context(error::SetFulltextOptionsSnafu { column_name })?;
1362
1363    Ok(())
1364}
1365
1366fn unset_column_fulltext_options(
1367    column_schema: &mut ColumnSchema,
1368    column_name: &str,
1369    current_options: Option<FulltextOptions>,
1370) -> Result<()> {
1371    ensure!(
1372        current_options
1373            .as_ref()
1374            .is_some_and(|options| options.enable),
1375        error::InvalidColumnOptionSnafu {
1376            column_name,
1377            msg: "FULLTEXT index already disabled".to_string(),
1378        }
1379    );
1380
1381    let mut options = current_options.unwrap();
1382    options.enable = false;
1383    column_schema
1384        .set_fulltext_options(&options)
1385        .context(error::SetFulltextOptionsSnafu { column_name })?;
1386
1387    Ok(())
1388}
1389
1390fn set_column_skipping_index_options(
1391    column_schema: &mut ColumnSchema,
1392    column_name: &str,
1393    options: &SkippingIndexOptions,
1394) -> Result<()> {
1395    column_schema
1396        .set_skipping_options(options)
1397        .context(error::SetSkippingOptionsSnafu { column_name })?;
1398
1399    Ok(())
1400}
1401
1402fn unset_column_skipping_index_options(
1403    column_schema: &mut ColumnSchema,
1404    column_name: &str,
1405) -> Result<()> {
1406    column_schema
1407        .unset_skipping_options()
1408        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1409    Ok(())
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414    use std::assert_matches::assert_matches;
1415
1416    use common_error::ext::ErrorExt;
1417    use common_error::status_code::StatusCode;
1418    use datatypes::data_type::ConcreteDataType;
1419    use datatypes::schema::{
1420        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1421    };
1422
1423    use super::*;
1424    use crate::Error;
1425
1426    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1427    fn new_test_schema() -> Schema {
1428        let column_schemas = vec![
1429            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1430            ColumnSchema::new(
1431                "ts",
1432                ConcreteDataType::timestamp_millisecond_datatype(),
1433                false,
1434            )
1435            .with_time_index(true),
1436            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1437        ];
1438        SchemaBuilder::try_from(column_schemas)
1439            .unwrap()
1440            .version(123)
1441            .build()
1442            .unwrap()
1443    }
1444
1445    #[test]
1446    fn test_raw_convert() {
1447        let schema = Arc::new(new_test_schema());
1448        let meta = TableMetaBuilder::empty()
1449            .schema(schema)
1450            .primary_key_indices(vec![0])
1451            .engine("engine")
1452            .next_column_id(3)
1453            .build()
1454            .unwrap();
1455        let info = TableInfoBuilder::default()
1456            .table_id(10)
1457            .table_version(5)
1458            .name("mytable")
1459            .meta(meta)
1460            .build()
1461            .unwrap();
1462
1463        let raw = RawTableInfo::from(info.clone());
1464        let info_new = TableInfo::try_from(raw).unwrap();
1465
1466        assert_eq!(info, info_new);
1467    }
1468
1469    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1470        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1471        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1472        let alter_kind = AlterKind::AddColumns {
1473            columns: vec![
1474                AddColumnRequest {
1475                    column_schema: new_tag,
1476                    is_key: true,
1477                    location: None,
1478                    add_if_not_exists: false,
1479                },
1480                AddColumnRequest {
1481                    column_schema: new_field,
1482                    is_key: false,
1483                    location: None,
1484                    add_if_not_exists: false,
1485                },
1486            ],
1487        };
1488
1489        let builder = meta
1490            .builder_with_alter_kind("my_table", &alter_kind)
1491            .unwrap();
1492        builder.build().unwrap()
1493    }
1494
1495    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1496        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1497        let new_field = ColumnSchema::new(
1498            "my_field_after_ts",
1499            ConcreteDataType::string_datatype(),
1500            true,
1501        );
1502        let yet_another_field = ColumnSchema::new(
1503            "yet_another_field_after_ts",
1504            ConcreteDataType::int64_datatype(),
1505            true,
1506        );
1507        let alter_kind = AlterKind::AddColumns {
1508            columns: vec![
1509                AddColumnRequest {
1510                    column_schema: new_tag,
1511                    is_key: true,
1512                    location: Some(AddColumnLocation::First),
1513                    add_if_not_exists: false,
1514                },
1515                AddColumnRequest {
1516                    column_schema: new_field,
1517                    is_key: false,
1518                    location: Some(AddColumnLocation::After {
1519                        column_name: "ts".to_string(),
1520                    }),
1521                    add_if_not_exists: false,
1522                },
1523                AddColumnRequest {
1524                    column_schema: yet_another_field,
1525                    is_key: true,
1526                    location: Some(AddColumnLocation::After {
1527                        column_name: "ts".to_string(),
1528                    }),
1529                    add_if_not_exists: false,
1530                },
1531            ],
1532        };
1533
1534        let builder = meta
1535            .builder_with_alter_kind("my_table", &alter_kind)
1536            .unwrap();
1537        builder.build().unwrap()
1538    }
1539
1540    #[test]
1541    fn test_add_columns() {
1542        let schema = Arc::new(new_test_schema());
1543        let meta = TableMetaBuilder::empty()
1544            .schema(schema)
1545            .primary_key_indices(vec![0])
1546            .engine("engine")
1547            .next_column_id(3)
1548            .build()
1549            .unwrap();
1550
1551        let new_meta = add_columns_to_meta(&meta);
1552        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1553
1554        let names: Vec<String> = new_meta
1555            .schema
1556            .column_schemas()
1557            .iter()
1558            .map(|column_schema| column_schema.name.clone())
1559            .collect();
1560        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1561        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1562        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1563    }
1564
1565    #[test]
1566    fn test_add_columns_multiple_times() {
1567        let schema = Arc::new(new_test_schema());
1568        let meta = TableMetaBuilder::empty()
1569            .schema(schema)
1570            .primary_key_indices(vec![0])
1571            .engine("engine")
1572            .next_column_id(3)
1573            .build()
1574            .unwrap();
1575
1576        let alter_kind = AlterKind::AddColumns {
1577            columns: vec![
1578                AddColumnRequest {
1579                    column_schema: ColumnSchema::new(
1580                        "col3",
1581                        ConcreteDataType::int32_datatype(),
1582                        true,
1583                    ),
1584                    is_key: true,
1585                    location: None,
1586                    add_if_not_exists: true,
1587                },
1588                AddColumnRequest {
1589                    column_schema: ColumnSchema::new(
1590                        "col3",
1591                        ConcreteDataType::int32_datatype(),
1592                        true,
1593                    ),
1594                    is_key: true,
1595                    location: None,
1596                    add_if_not_exists: true,
1597                },
1598            ],
1599        };
1600        let err = meta
1601            .builder_with_alter_kind("my_table", &alter_kind)
1602            .err()
1603            .unwrap();
1604        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1605    }
1606
1607    #[test]
1608    fn test_remove_columns() {
1609        let schema = Arc::new(new_test_schema());
1610        let meta = TableMetaBuilder::empty()
1611            .schema(schema.clone())
1612            .primary_key_indices(vec![0])
1613            .engine("engine")
1614            .next_column_id(3)
1615            .build()
1616            .unwrap();
1617        // Add more columns so we have enough candidate columns to remove.
1618        let meta = add_columns_to_meta(&meta);
1619
1620        let alter_kind = AlterKind::DropColumns {
1621            names: vec![String::from("col2"), String::from("my_field")],
1622        };
1623        let new_meta = meta
1624            .builder_with_alter_kind("my_table", &alter_kind)
1625            .unwrap()
1626            .build()
1627            .unwrap();
1628
1629        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1630
1631        let names: Vec<String> = new_meta
1632            .schema
1633            .column_schemas()
1634            .iter()
1635            .map(|column_schema| column_schema.name.clone())
1636            .collect();
1637        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1638        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1639        assert_eq!(&[1], &new_meta.value_indices[..]);
1640        assert_eq!(
1641            schema.timestamp_column(),
1642            new_meta.schema.timestamp_column()
1643        );
1644    }
1645
1646    #[test]
1647    fn test_remove_multiple_columns_before_timestamp() {
1648        let column_schemas = vec![
1649            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1650            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1651            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1652            ColumnSchema::new(
1653                "ts",
1654                ConcreteDataType::timestamp_millisecond_datatype(),
1655                false,
1656            )
1657            .with_time_index(true),
1658        ];
1659        let schema = Arc::new(
1660            SchemaBuilder::try_from(column_schemas)
1661                .unwrap()
1662                .version(123)
1663                .build()
1664                .unwrap(),
1665        );
1666        let meta = TableMetaBuilder::empty()
1667            .schema(schema.clone())
1668            .primary_key_indices(vec![1])
1669            .engine("engine")
1670            .next_column_id(4)
1671            .build()
1672            .unwrap();
1673
1674        // Remove columns in reverse order to test whether timestamp index is valid.
1675        let alter_kind = AlterKind::DropColumns {
1676            names: vec![String::from("col3"), String::from("col1")],
1677        };
1678        let new_meta = meta
1679            .builder_with_alter_kind("my_table", &alter_kind)
1680            .unwrap()
1681            .build()
1682            .unwrap();
1683
1684        let names: Vec<String> = new_meta
1685            .schema
1686            .column_schemas()
1687            .iter()
1688            .map(|column_schema| column_schema.name.clone())
1689            .collect();
1690        assert_eq!(&["col2", "ts"], &names[..]);
1691        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1692        assert_eq!(&[1], &new_meta.value_indices[..]);
1693        assert_eq!(
1694            schema.timestamp_column(),
1695            new_meta.schema.timestamp_column()
1696        );
1697    }
1698
1699    #[test]
1700    fn test_add_existing_column() {
1701        let schema = Arc::new(new_test_schema());
1702        let meta = TableMetaBuilder::empty()
1703            .schema(schema)
1704            .primary_key_indices(vec![0])
1705            .engine("engine")
1706            .next_column_id(3)
1707            .build()
1708            .unwrap();
1709
1710        let alter_kind = AlterKind::AddColumns {
1711            columns: vec![AddColumnRequest {
1712                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1713                is_key: false,
1714                location: None,
1715                add_if_not_exists: false,
1716            }],
1717        };
1718
1719        let err = meta
1720            .builder_with_alter_kind("my_table", &alter_kind)
1721            .err()
1722            .unwrap();
1723        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1724
1725        // Add if not exists
1726        let alter_kind = AlterKind::AddColumns {
1727            columns: vec![AddColumnRequest {
1728                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1729                is_key: true,
1730                location: None,
1731                add_if_not_exists: true,
1732            }],
1733        };
1734        let new_meta = meta
1735            .builder_with_alter_kind("my_table", &alter_kind)
1736            .unwrap()
1737            .build()
1738            .unwrap();
1739        assert_eq!(
1740            meta.schema.column_schemas(),
1741            new_meta.schema.column_schemas()
1742        );
1743        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1744    }
1745
1746    #[test]
1747    fn test_add_different_type_column() {
1748        let schema = Arc::new(new_test_schema());
1749        let meta = TableMetaBuilder::empty()
1750            .schema(schema)
1751            .primary_key_indices(vec![0])
1752            .engine("engine")
1753            .next_column_id(3)
1754            .build()
1755            .unwrap();
1756
1757        // Add if not exists, but different type.
1758        let alter_kind = AlterKind::AddColumns {
1759            columns: vec![AddColumnRequest {
1760                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1761                is_key: false,
1762                location: None,
1763                add_if_not_exists: true,
1764            }],
1765        };
1766        let err = meta
1767            .builder_with_alter_kind("my_table", &alter_kind)
1768            .err()
1769            .unwrap();
1770        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1771    }
1772
1773    #[test]
1774    fn test_add_invalid_column() {
1775        let schema = Arc::new(new_test_schema());
1776        let meta = TableMetaBuilder::empty()
1777            .schema(schema)
1778            .primary_key_indices(vec![0])
1779            .engine("engine")
1780            .next_column_id(3)
1781            .build()
1782            .unwrap();
1783
1784        // Not nullable and no default value.
1785        let alter_kind = AlterKind::AddColumns {
1786            columns: vec![AddColumnRequest {
1787                column_schema: ColumnSchema::new(
1788                    "weny",
1789                    ConcreteDataType::string_datatype(),
1790                    false,
1791                ),
1792                is_key: false,
1793                location: None,
1794                add_if_not_exists: false,
1795            }],
1796        };
1797
1798        let err = meta
1799            .builder_with_alter_kind("my_table", &alter_kind)
1800            .err()
1801            .unwrap();
1802        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1803    }
1804
1805    #[test]
1806    fn test_remove_unknown_column() {
1807        let schema = Arc::new(new_test_schema());
1808        let meta = TableMetaBuilder::empty()
1809            .schema(schema)
1810            .primary_key_indices(vec![0])
1811            .engine("engine")
1812            .next_column_id(3)
1813            .build()
1814            .unwrap();
1815
1816        let alter_kind = AlterKind::DropColumns {
1817            names: vec![String::from("unknown")],
1818        };
1819
1820        let err = meta
1821            .builder_with_alter_kind("my_table", &alter_kind)
1822            .err()
1823            .unwrap();
1824        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1825    }
1826
1827    #[test]
1828    fn test_change_unknown_column_data_type() {
1829        let schema = Arc::new(new_test_schema());
1830        let meta = TableMetaBuilder::empty()
1831            .schema(schema)
1832            .primary_key_indices(vec![0])
1833            .engine("engine")
1834            .next_column_id(3)
1835            .build()
1836            .unwrap();
1837
1838        let alter_kind = AlterKind::ModifyColumnTypes {
1839            columns: vec![ModifyColumnTypeRequest {
1840                column_name: "unknown".to_string(),
1841                target_type: ConcreteDataType::string_datatype(),
1842            }],
1843        };
1844
1845        let err = meta
1846            .builder_with_alter_kind("my_table", &alter_kind)
1847            .err()
1848            .unwrap();
1849        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1850    }
1851
1852    #[test]
1853    fn test_remove_key_column() {
1854        let schema = Arc::new(new_test_schema());
1855        let meta = TableMetaBuilder::empty()
1856            .schema(schema)
1857            .primary_key_indices(vec![0])
1858            .engine("engine")
1859            .next_column_id(3)
1860            .build()
1861            .unwrap();
1862
1863        // Remove column in primary key.
1864        let alter_kind = AlterKind::DropColumns {
1865            names: vec![String::from("col1")],
1866        };
1867
1868        let err = meta
1869            .builder_with_alter_kind("my_table", &alter_kind)
1870            .err()
1871            .unwrap();
1872        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1873
1874        // Remove timestamp column.
1875        let alter_kind = AlterKind::DropColumns {
1876            names: vec![String::from("ts")],
1877        };
1878
1879        let err = meta
1880            .builder_with_alter_kind("my_table", &alter_kind)
1881            .err()
1882            .unwrap();
1883        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1884    }
1885
1886    #[test]
1887    fn test_remove_partition_column() {
1888        let schema = Arc::new(new_test_schema());
1889        let meta = TableMetaBuilder::empty()
1890            .schema(schema)
1891            .primary_key_indices(vec![])
1892            .partition_key_indices(vec![0])
1893            .engine("engine")
1894            .next_column_id(3)
1895            .build()
1896            .unwrap();
1897        // Remove column in primary key.
1898        let alter_kind = AlterKind::DropColumns {
1899            names: vec![String::from("col1")],
1900        };
1901
1902        let err = meta
1903            .builder_with_alter_kind("my_table", &alter_kind)
1904            .err()
1905            .unwrap();
1906        assert_matches!(err, Error::RemovePartitionColumn { .. });
1907    }
1908
1909    #[test]
1910    fn test_change_key_column_data_type() {
1911        let schema = Arc::new(new_test_schema());
1912        let meta = TableMetaBuilder::empty()
1913            .schema(schema)
1914            .primary_key_indices(vec![0])
1915            .engine("engine")
1916            .next_column_id(3)
1917            .build()
1918            .unwrap();
1919
1920        // Remove column in primary key.
1921        let alter_kind = AlterKind::ModifyColumnTypes {
1922            columns: vec![ModifyColumnTypeRequest {
1923                column_name: "col1".to_string(),
1924                target_type: ConcreteDataType::string_datatype(),
1925            }],
1926        };
1927
1928        let err = meta
1929            .builder_with_alter_kind("my_table", &alter_kind)
1930            .err()
1931            .unwrap();
1932        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1933
1934        // Remove timestamp column.
1935        let alter_kind = AlterKind::ModifyColumnTypes {
1936            columns: vec![ModifyColumnTypeRequest {
1937                column_name: "ts".to_string(),
1938                target_type: ConcreteDataType::string_datatype(),
1939            }],
1940        };
1941
1942        let err = meta
1943            .builder_with_alter_kind("my_table", &alter_kind)
1944            .err()
1945            .unwrap();
1946        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1947    }
1948
1949    #[test]
1950    fn test_alloc_new_column() {
1951        let schema = Arc::new(new_test_schema());
1952        let mut meta = TableMetaBuilder::empty()
1953            .schema(schema)
1954            .primary_key_indices(vec![0])
1955            .engine("engine")
1956            .next_column_id(3)
1957            .build()
1958            .unwrap();
1959        assert_eq!(3, meta.next_column_id);
1960
1961        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1962        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1963
1964        assert_eq!(4, meta.next_column_id);
1965        assert_eq!(column_schema.name, desc.name);
1966    }
1967
1968    #[test]
1969    fn test_add_columns_with_location() {
1970        let schema = Arc::new(new_test_schema());
1971        let meta = TableMetaBuilder::empty()
1972            .schema(schema)
1973            .primary_key_indices(vec![0])
1974            // partition col: col1, col2
1975            .partition_key_indices(vec![0, 2])
1976            .engine("engine")
1977            .next_column_id(3)
1978            .build()
1979            .unwrap();
1980
1981        let new_meta = add_columns_to_meta_with_location(&meta);
1982        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1983
1984        let names: Vec<String> = new_meta
1985            .schema
1986            .column_schemas()
1987            .iter()
1988            .map(|column_schema| column_schema.name.clone())
1989            .collect();
1990        assert_eq!(
1991            &[
1992                "my_tag_first",               // primary key column
1993                "col1",                       // partition column
1994                "ts",                         // timestamp column
1995                "yet_another_field_after_ts", // primary key column
1996                "my_field_after_ts",          // value column
1997                "col2",                       // partition column
1998            ],
1999            &names[..]
2000        );
2001        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2002        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2003        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2004    }
2005
2006    #[test]
2007    fn test_modify_column_fulltext_options() {
2008        let schema = Arc::new(new_test_schema());
2009        let meta = TableMetaBuilder::empty()
2010            .schema(schema)
2011            .primary_key_indices(vec![0])
2012            .engine("engine")
2013            .next_column_id(3)
2014            .build()
2015            .unwrap();
2016
2017        let alter_kind = AlterKind::SetIndexes {
2018            options: vec![SetIndexOption::Fulltext {
2019                column_name: "col1".to_string(),
2020                options: FulltextOptions::default(),
2021            }],
2022        };
2023        let err = meta
2024            .builder_with_alter_kind("my_table", &alter_kind)
2025            .err()
2026            .unwrap();
2027        assert_eq!(
2028            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2029            err.to_string()
2030        );
2031
2032        // Add a string column and make it fulltext indexed
2033        let new_meta = add_columns_to_meta_with_location(&meta);
2034        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2035
2036        let alter_kind = AlterKind::SetIndexes {
2037            options: vec![SetIndexOption::Fulltext {
2038                column_name: "my_tag_first".to_string(),
2039                options: FulltextOptions::new_unchecked(
2040                    true,
2041                    FulltextAnalyzer::Chinese,
2042                    true,
2043                    FulltextBackend::Bloom,
2044                    1000,
2045                    0.01,
2046                ),
2047            }],
2048        };
2049        let new_meta = new_meta
2050            .builder_with_alter_kind("my_table", &alter_kind)
2051            .unwrap()
2052            .build()
2053            .unwrap();
2054        let column_schema = new_meta
2055            .schema
2056            .column_schema_by_name("my_tag_first")
2057            .unwrap();
2058        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2059        assert!(fulltext_options.enable);
2060        assert_eq!(
2061            datatypes::schema::FulltextAnalyzer::Chinese,
2062            fulltext_options.analyzer
2063        );
2064        assert!(fulltext_options.case_sensitive);
2065
2066        let alter_kind = AlterKind::UnsetIndexes {
2067            options: vec![UnsetIndexOption::Fulltext {
2068                column_name: "my_tag_first".to_string(),
2069            }],
2070        };
2071        let new_meta = new_meta
2072            .builder_with_alter_kind("my_table", &alter_kind)
2073            .unwrap()
2074            .build()
2075            .unwrap();
2076        let column_schema = new_meta
2077            .schema
2078            .column_schema_by_name("my_tag_first")
2079            .unwrap();
2080        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2081        assert!(!fulltext_options.enable);
2082    }
2083}