table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39    UnsetIndexOptions,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104/// Identifier of the table.
105#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
106pub struct TableIdent {
107    /// Unique id of this table.
108    pub table_id: TableId,
109    /// Version of the table, bumped when metadata (such as schema) of the table
110    /// being changed.
111    pub version: TableVersion,
112}
113
114/// The table metadata.
115///
116/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
117#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
118#[builder(pattern = "mutable", custom_constructor)]
119pub struct TableMeta {
120    pub schema: SchemaRef,
121    /// The indices of columns in primary key. Note that the index of timestamp column
122    /// is not included in these indices.
123    pub primary_key_indices: Vec<usize>,
124    #[builder(default = "self.default_value_indices()?")]
125    pub value_indices: Vec<usize>,
126    #[builder(default, setter(into))]
127    pub engine: String,
128    #[builder(default, setter(into))]
129    pub region_numbers: Vec<u32>,
130    pub next_column_id: ColumnId,
131    /// Table options.
132    #[builder(default)]
133    pub options: TableOptions,
134    #[builder(default = "Utc::now()")]
135    pub created_on: DateTime<Utc>,
136    #[builder(default = "Vec::new()")]
137    pub partition_key_indices: Vec<usize>,
138    #[builder(default = "Vec::new()")]
139    pub column_ids: Vec<ColumnId>,
140}
141
142impl TableMetaBuilder {
143    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
144    #[cfg(any(test, feature = "testing"))]
145    pub fn empty() -> Self {
146        Self {
147            schema: None,
148            primary_key_indices: None,
149            value_indices: None,
150            engine: None,
151            region_numbers: None,
152            next_column_id: None,
153            options: None,
154            created_on: None,
155            partition_key_indices: None,
156            column_ids: None,
157        }
158    }
159}
160
161impl TableMetaBuilder {
162    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
163        match (&self.primary_key_indices, &self.schema) {
164            (Some(v), Some(schema)) => {
165                let column_schemas = schema.column_schemas();
166                Ok((0..column_schemas.len())
167                    .filter(|idx| !v.contains(idx))
168                    .collect())
169            }
170            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
171        }
172    }
173
174    pub fn new_external_table() -> Self {
175        Self {
176            schema: None,
177            primary_key_indices: Some(Vec::new()),
178            value_indices: Some(Vec::new()),
179            engine: None,
180            region_numbers: Some(Vec::new()),
181            next_column_id: Some(0),
182            options: None,
183            created_on: None,
184            partition_key_indices: None,
185            column_ids: None,
186        }
187    }
188}
189
190/// The result after splitting requests by column location info.
191struct SplitResult<'a> {
192    /// column requests should be added at first place.
193    columns_at_first: Vec<&'a AddColumnRequest>,
194    /// column requests should be added after already exist columns.
195    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
196    /// column requests should be added at last place.
197    columns_at_last: Vec<&'a AddColumnRequest>,
198    /// all column names should be added.
199    column_names: Vec<String>,
200}
201
202impl TableMeta {
203    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
204        let columns_schemas = &self.schema.column_schemas();
205        self.primary_key_indices
206            .iter()
207            .map(|idx| &columns_schemas[*idx].name)
208    }
209
210    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
211        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
212        let columns_schemas = self.schema.column_schemas();
213        let primary_key_indices = &self.primary_key_indices;
214        columns_schemas
215            .iter()
216            .enumerate()
217            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
218            .map(|(_, cs)| &cs.name)
219    }
220
221    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
222    ///
223    /// The returned builder would derive the next column id of this meta.
224    pub fn builder_with_alter_kind(
225        &self,
226        table_name: &str,
227        alter_kind: &AlterKind,
228    ) -> Result<TableMetaBuilder> {
229        match alter_kind {
230            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
231            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
232            AlterKind::ModifyColumnTypes { columns } => {
233                self.modify_column_types(table_name, columns)
234            }
235            // No need to rebuild table meta when renaming tables.
236            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
237            AlterKind::SetTableOptions { options } => self.set_table_options(options),
238            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
239            AlterKind::SetIndex { options } => match options {
240                SetIndexOptions::Fulltext {
241                    column_name,
242                    options,
243                } => self.change_column_fulltext_options(
244                    table_name,
245                    column_name,
246                    true,
247                    Some(options),
248                ),
249                SetIndexOptions::Inverted { column_name } => {
250                    self.change_column_modify_inverted_index(table_name, column_name, true)
251                }
252                SetIndexOptions::Skipping {
253                    column_name,
254                    options,
255                } => self.change_column_skipping_index_options(
256                    table_name,
257                    column_name,
258                    Some(options),
259                ),
260            },
261            AlterKind::UnsetIndex { options } => match options {
262                UnsetIndexOptions::Fulltext { column_name } => {
263                    self.change_column_fulltext_options(table_name, column_name, false, None)
264                }
265                UnsetIndexOptions::Inverted { column_name } => {
266                    self.change_column_modify_inverted_index(table_name, column_name, false)
267                }
268                UnsetIndexOptions::Skipping { column_name } => {
269                    self.change_column_skipping_index_options(table_name, column_name, None)
270                }
271            },
272            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
273        }
274    }
275
276    /// Creates a [TableMetaBuilder] with modified table options.
277    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
278        let mut new_options = self.options.clone();
279
280        for request in requests {
281            match request {
282                SetRegionOption::Ttl(new_ttl) => {
283                    new_options.ttl = *new_ttl;
284                }
285                SetRegionOption::Twsc(key, value) => {
286                    if !value.is_empty() {
287                        new_options
288                            .extra_options
289                            .insert(key.to_string(), value.to_string());
290                        // Ensure node restart correctly.
291                        new_options.extra_options.insert(
292                            COMPACTION_TYPE.to_string(),
293                            COMPACTION_TYPE_TWCS.to_string(),
294                        );
295                    } else {
296                        // Invalidate the previous change option if an empty value has been set.
297                        new_options.extra_options.remove(key.as_str());
298                    }
299                }
300            }
301        }
302        let mut builder = self.new_meta_builder();
303        builder.options(new_options);
304
305        Ok(builder)
306    }
307
308    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
309        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
310        self.set_table_options(&requests)
311    }
312
313    /// Creates a [TableMetaBuilder] with modified column inverted index.
314    fn change_column_modify_inverted_index(
315        &self,
316        table_name: &str,
317        column_name: &str,
318        value: bool,
319    ) -> Result<TableMetaBuilder> {
320        let table_schema = &self.schema;
321        let mut meta_builder = self.new_meta_builder();
322
323        let mut columns: Vec<ColumnSchema> =
324            Vec::with_capacity(table_schema.column_schemas().len());
325
326        for column_schema in table_schema.column_schemas().iter() {
327            if column_schema.name == column_name {
328                let mut new_column_schema = column_schema.clone();
329                new_column_schema.set_inverted_index(value);
330                columns.push(new_column_schema);
331            } else {
332                columns.push(column_schema.clone());
333            }
334        }
335
336        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
337        let mut builder = SchemaBuilder::try_from_columns(columns)
338            .with_context(|_| error::SchemaBuildSnafu {
339                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
340            })?
341            .version(table_schema.version() + 1);
342
343        for (k, v) in table_schema.metadata().iter() {
344            builder = builder.add_metadata(k, v);
345        }
346
347        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
348            msg: format!(
349                "Table {table_name} cannot change fulltext options for column {column_name}",
350            ),
351        })?;
352
353        let _ = meta_builder
354            .schema(Arc::new(new_schema))
355            .primary_key_indices(self.primary_key_indices.clone());
356
357        Ok(meta_builder)
358    }
359
360    /// Creates a [TableMetaBuilder] with modified column fulltext options.
361    fn change_column_fulltext_options(
362        &self,
363        table_name: &str,
364        column_name: &str,
365        enable: bool,
366        options: Option<&FulltextOptions>,
367    ) -> Result<TableMetaBuilder> {
368        let table_schema = &self.schema;
369        let mut meta_builder = self.new_meta_builder();
370
371        let column = &table_schema
372            .column_schema_by_name(column_name)
373            .with_context(|| error::ColumnNotExistsSnafu {
374                column_name,
375                table_name,
376            })?;
377
378        ensure!(
379            column.data_type.is_string(),
380            error::InvalidColumnOptionSnafu {
381                column_name,
382                msg: "FULLTEXT index only supports string type",
383            }
384        );
385
386        let current_fulltext_options = column
387            .fulltext_options()
388            .context(error::SetFulltextOptionsSnafu { column_name })?;
389
390        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
391        for column_schema in table_schema.column_schemas() {
392            if column_schema.name == column_name {
393                let mut new_column_schema = column_schema.clone();
394                if enable {
395                    ensure!(
396                        options.is_some(),
397                        error::InvalidColumnOptionSnafu {
398                            column_name,
399                            msg: "FULLTEXT index options must be provided",
400                        }
401                    );
402                    set_column_fulltext_options(
403                        &mut new_column_schema,
404                        column_name,
405                        options.unwrap(),
406                        current_fulltext_options.clone(),
407                    )?
408                } else {
409                    unset_column_fulltext_options(
410                        &mut new_column_schema,
411                        column_name,
412                        current_fulltext_options.clone(),
413                    )?
414                }
415                columns.push(new_column_schema);
416            } else {
417                columns.push(column_schema.clone());
418            }
419        }
420
421        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
422        let mut builder = SchemaBuilder::try_from_columns(columns)
423            .with_context(|_| error::SchemaBuildSnafu {
424                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
425            })?
426            .version(table_schema.version() + 1);
427
428        for (k, v) in table_schema.metadata().iter() {
429            builder = builder.add_metadata(k, v);
430        }
431
432        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
433            msg: format!(
434                "Table {table_name} cannot change fulltext options for column {column_name}",
435            ),
436        })?;
437
438        let _ = meta_builder
439            .schema(Arc::new(new_schema))
440            .primary_key_indices(self.primary_key_indices.clone());
441
442        Ok(meta_builder)
443    }
444
445    /// Creates a [TableMetaBuilder] with modified column skipping index options.
446    fn change_column_skipping_index_options(
447        &self,
448        table_name: &str,
449        column_name: &str,
450        options: Option<&SkippingIndexOptions>,
451    ) -> Result<TableMetaBuilder> {
452        let table_schema = &self.schema;
453        let mut meta_builder = self.new_meta_builder();
454
455        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
456        for column_schema in table_schema.column_schemas() {
457            if column_schema.name == column_name {
458                let mut new_column_schema = column_schema.clone();
459                if let Some(options) = options {
460                    set_column_skipping_index_options(
461                        &mut new_column_schema,
462                        column_name,
463                        options,
464                    )?;
465                } else {
466                    unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
467                }
468                columns.push(new_column_schema);
469            } else {
470                columns.push(column_schema.clone());
471            }
472        }
473
474        let mut builder = SchemaBuilder::try_from_columns(columns)
475            .with_context(|_| error::SchemaBuildSnafu {
476                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
477            })?
478            .version(table_schema.version() + 1);
479
480        for (k, v) in table_schema.metadata().iter() {
481            builder = builder.add_metadata(k, v);
482        }
483
484        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
485            msg: format!("Failed to convert column schemas into schema for table {table_name}"),
486        })?;
487
488        let _ = meta_builder
489            .schema(Arc::new(new_schema))
490            .primary_key_indices(self.primary_key_indices.clone());
491
492        Ok(meta_builder)
493    }
494
495    // TODO(yingwen): Remove this.
496    /// Allocate a new column for the table.
497    ///
498    /// This method would bump the `next_column_id` of the meta.
499    pub fn alloc_new_column(
500        &mut self,
501        table_name: &str,
502        new_column: &ColumnSchema,
503    ) -> Result<ColumnDescriptor> {
504        let desc = ColumnDescriptorBuilder::new(
505            self.next_column_id as ColumnId,
506            &new_column.name,
507            new_column.data_type.clone(),
508        )
509        .is_nullable(new_column.is_nullable())
510        .default_constraint(new_column.default_constraint().cloned())
511        .build()
512        .context(error::BuildColumnDescriptorSnafu {
513            table_name,
514            column_name: &new_column.name,
515        })?;
516
517        // Bump next column id.
518        self.next_column_id += 1;
519
520        Ok(desc)
521    }
522
523    /// Create a [`TableMetaBuilder`] from the current TableMeta.
524    fn new_meta_builder(&self) -> TableMetaBuilder {
525        let mut builder = TableMetaBuilder::from(self);
526        // Manually remove value_indices.
527        builder.value_indices = None;
528        builder
529    }
530
531    // TODO(yingwen): Tests add if not exists.
532    fn add_columns(
533        &self,
534        table_name: &str,
535        requests: &[AddColumnRequest],
536    ) -> Result<TableMetaBuilder> {
537        let table_schema = &self.schema;
538        let mut meta_builder = self.new_meta_builder();
539        let original_primary_key_indices: HashSet<&usize> =
540            self.primary_key_indices.iter().collect();
541
542        let mut names = HashSet::with_capacity(requests.len());
543        let mut new_columns = Vec::with_capacity(requests.len());
544        for col_to_add in requests {
545            if let Some(column_schema) =
546                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
547            {
548                // If the column already exists.
549                ensure!(
550                    col_to_add.add_if_not_exists,
551                    error::ColumnExistsSnafu {
552                        table_name,
553                        column_name: &col_to_add.column_schema.name
554                    },
555                );
556
557                // Checks if the type is the same
558                ensure!(
559                    column_schema.data_type == col_to_add.column_schema.data_type,
560                    error::InvalidAlterRequestSnafu {
561                        table: table_name,
562                        err: format!(
563                            "column {} already exists with different type {:?}",
564                            col_to_add.column_schema.name, column_schema.data_type,
565                        ),
566                    }
567                );
568            } else {
569                // A new column.
570                // Ensures we only add a column once.
571                ensure!(
572                    names.insert(&col_to_add.column_schema.name),
573                    error::InvalidAlterRequestSnafu {
574                        table: table_name,
575                        err: format!(
576                            "add column {} more than once",
577                            col_to_add.column_schema.name
578                        ),
579                    }
580                );
581
582                ensure!(
583                    col_to_add.column_schema.is_nullable()
584                        || col_to_add.column_schema.default_constraint().is_some(),
585                    error::InvalidAlterRequestSnafu {
586                        table: table_name,
587                        err: format!(
588                            "no default value for column {}",
589                            col_to_add.column_schema.name
590                        ),
591                    },
592                );
593
594                new_columns.push(col_to_add.clone());
595            }
596        }
597        let requests = &new_columns[..];
598
599        let SplitResult {
600            columns_at_first,
601            columns_at_after,
602            columns_at_last,
603            column_names,
604        } = self.split_requests_by_column_location(table_name, requests)?;
605        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
606        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
607        // add new columns with FIRST, and in reverse order of requests.
608        columns_at_first.iter().rev().for_each(|request| {
609            if request.is_key {
610                // If a key column is added, we also need to store its index in primary_key_indices.
611                primary_key_indices.push(columns.len());
612            }
613            columns.push(request.column_schema.clone());
614        });
615        // add existed columns in original order and handle new columns with AFTER.
616        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
617            if original_primary_key_indices.contains(&index) {
618                primary_key_indices.push(columns.len());
619            }
620            columns.push(column_schema.clone());
621            if let Some(requests) = columns_at_after.get(&column_schema.name) {
622                requests.iter().rev().for_each(|request| {
623                    if request.is_key {
624                        // If a key column is added, we also need to store its index in primary_key_indices.
625                        primary_key_indices.push(columns.len());
626                    }
627                    columns.push(request.column_schema.clone());
628                });
629            }
630        }
631        // add new columns without location info to last.
632        columns_at_last.iter().for_each(|request| {
633            if request.is_key {
634                // If a key column is added, we also need to store its index in primary_key_indices.
635                primary_key_indices.push(columns.len());
636            }
637            columns.push(request.column_schema.clone());
638        });
639
640        let mut builder = SchemaBuilder::try_from(columns)
641            .with_context(|_| error::SchemaBuildSnafu {
642                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
643            })?
644            // Also bump the schema version.
645            .version(table_schema.version() + 1);
646        for (k, v) in table_schema.metadata().iter() {
647            builder = builder.add_metadata(k, v);
648        }
649        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
650            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
651        })?;
652
653        let partition_key_indices = self
654            .partition_key_indices
655            .iter()
656            .map(|idx| table_schema.column_name_by_index(*idx))
657            // This unwrap is safe since we only add new columns.
658            .map(|name| new_schema.column_index_by_name(name).unwrap())
659            .collect();
660
661        // value_indices would be generated automatically.
662        let _ = meta_builder
663            .schema(Arc::new(new_schema))
664            .primary_key_indices(primary_key_indices)
665            .partition_key_indices(partition_key_indices);
666
667        Ok(meta_builder)
668    }
669
670    fn remove_columns(
671        &self,
672        table_name: &str,
673        column_names: &[String],
674    ) -> Result<TableMetaBuilder> {
675        let table_schema = &self.schema;
676        let column_names: HashSet<_> = column_names.iter().collect();
677        let mut meta_builder = self.new_meta_builder();
678
679        let timestamp_index = table_schema.timestamp_index();
680        // Check whether columns are existing and not in primary key index.
681        for column_name in &column_names {
682            if let Some(index) = table_schema.column_index_by_name(column_name) {
683                // This is a linear search, but since there won't be too much columns, the performance should
684                // be acceptable.
685                ensure!(
686                    !self.primary_key_indices.contains(&index),
687                    error::RemoveColumnInIndexSnafu {
688                        column_name: *column_name,
689                        table_name,
690                    }
691                );
692
693                ensure!(
694                    !self.partition_key_indices.contains(&index),
695                    error::RemovePartitionColumnSnafu {
696                        column_name: *column_name,
697                        table_name,
698                    }
699                );
700
701                if let Some(ts_index) = timestamp_index {
702                    // Not allowed to remove column in timestamp index.
703                    ensure!(
704                        index != ts_index,
705                        error::RemoveColumnInIndexSnafu {
706                            column_name: table_schema.column_name_by_index(ts_index),
707                            table_name,
708                        }
709                    );
710                }
711            } else {
712                return error::ColumnNotExistsSnafu {
713                    column_name: *column_name,
714                    table_name,
715                }
716                .fail()?;
717            }
718        }
719
720        // Collect columns after removal.
721        let columns: Vec<_> = table_schema
722            .column_schemas()
723            .iter()
724            .filter(|column_schema| !column_names.contains(&column_schema.name))
725            .cloned()
726            .collect();
727
728        let mut builder = SchemaBuilder::try_from_columns(columns)
729            .with_context(|_| error::SchemaBuildSnafu {
730                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
731            })?
732            // Also bump the schema version.
733            .version(table_schema.version() + 1);
734        for (k, v) in table_schema.metadata().iter() {
735            builder = builder.add_metadata(k, v);
736        }
737        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
738            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
739        })?;
740
741        // Rebuild the indices of primary key columns.
742        let primary_key_indices = self
743            .primary_key_indices
744            .iter()
745            .map(|idx| table_schema.column_name_by_index(*idx))
746            // This unwrap is safe since we don't allow removing a primary key column.
747            .map(|name| new_schema.column_index_by_name(name).unwrap())
748            .collect();
749
750        let partition_key_indices = self
751            .partition_key_indices
752            .iter()
753            .map(|idx| table_schema.column_name_by_index(*idx))
754            // This unwrap is safe since we don't allow removing a partition key column.
755            .map(|name| new_schema.column_index_by_name(name).unwrap())
756            .collect();
757
758        let _ = meta_builder
759            .schema(Arc::new(new_schema))
760            .primary_key_indices(primary_key_indices)
761            .partition_key_indices(partition_key_indices);
762
763        Ok(meta_builder)
764    }
765
766    fn modify_column_types(
767        &self,
768        table_name: &str,
769        requests: &[ModifyColumnTypeRequest],
770    ) -> Result<TableMetaBuilder> {
771        let table_schema = &self.schema;
772        let mut meta_builder = self.new_meta_builder();
773
774        let mut modify_column_types = HashMap::with_capacity(requests.len());
775        let timestamp_index = table_schema.timestamp_index();
776
777        for col_to_change in requests {
778            let change_column_name = &col_to_change.column_name;
779
780            let index = table_schema
781                .column_index_by_name(change_column_name)
782                .with_context(|| error::ColumnNotExistsSnafu {
783                    column_name: change_column_name,
784                    table_name,
785                })?;
786
787            let column = &table_schema.column_schemas()[index];
788
789            ensure!(
790                !self.primary_key_indices.contains(&index),
791                error::InvalidAlterRequestSnafu {
792                    table: table_name,
793                    err: format!(
794                        "Not allowed to change primary key index column '{}'",
795                        column.name
796                    )
797                }
798            );
799
800            if let Some(ts_index) = timestamp_index {
801                // Not allowed to change column datatype in timestamp index.
802                ensure!(
803                    index != ts_index,
804                    error::InvalidAlterRequestSnafu {
805                        table: table_name,
806                        err: format!(
807                            "Not allowed to change timestamp index column '{}' datatype",
808                            column.name
809                        )
810                    }
811                );
812            }
813
814            ensure!(
815                modify_column_types
816                    .insert(&col_to_change.column_name, col_to_change)
817                    .is_none(),
818                error::InvalidAlterRequestSnafu {
819                    table: table_name,
820                    err: format!(
821                        "change column datatype {} more than once",
822                        col_to_change.column_name
823                    ),
824                }
825            );
826
827            ensure!(
828                column
829                    .data_type
830                    .can_arrow_type_cast_to(&col_to_change.target_type),
831                error::InvalidAlterRequestSnafu {
832                    table: table_name,
833                    err: format!(
834                        "column '{}' cannot be cast automatically to type '{}'",
835                        col_to_change.column_name, col_to_change.target_type,
836                    ),
837                }
838            );
839
840            ensure!(
841                column.is_nullable(),
842                error::InvalidAlterRequestSnafu {
843                    table: table_name,
844                    err: format!(
845                        "column '{}' must be nullable to ensure safe conversion.",
846                        col_to_change.column_name,
847                    ),
848                }
849            );
850        }
851        // Collect columns after changed.
852
853        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
854        for mut column in table_schema.column_schemas().iter().cloned() {
855            if let Some(change_column) = modify_column_types.get(&column.name) {
856                column.data_type = change_column.target_type.clone();
857                let new_default = if let Some(default_value) = column.default_constraint() {
858                    Some(
859                        default_value
860                            .cast_to_datatype(&change_column.target_type)
861                            .with_context(|_| error::CastDefaultValueSnafu {
862                                reason: format!(
863                                    "Failed to cast default value from {:?} to type {:?}",
864                                    default_value, &change_column.target_type
865                                ),
866                            })?,
867                    )
868                } else {
869                    None
870                };
871                column = column
872                    .clone()
873                    .with_default_constraint(new_default.clone())
874                    .with_context(|_| error::CastDefaultValueSnafu {
875                        reason: format!("Failed to set new default: {:?}", new_default),
876                    })?;
877            }
878            columns.push(column)
879        }
880
881        let mut builder = SchemaBuilder::try_from_columns(columns)
882            .with_context(|_| error::SchemaBuildSnafu {
883                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
884            })?
885            // Also bump the schema version.
886            .version(table_schema.version() + 1);
887        for (k, v) in table_schema.metadata().iter() {
888            builder = builder.add_metadata(k, v);
889        }
890        let new_schema = builder.build().with_context(|_| {
891            let column_names: Vec<_> = requests
892                .iter()
893                .map(|request| &request.column_name)
894                .collect();
895
896            error::SchemaBuildSnafu {
897                msg: format!(
898                    "Table {table_name} cannot change datatype with columns {column_names:?}"
899                ),
900            }
901        })?;
902
903        let _ = meta_builder
904            .schema(Arc::new(new_schema))
905            .primary_key_indices(self.primary_key_indices.clone());
906
907        Ok(meta_builder)
908    }
909
910    /// Split requests into different groups using column location info.
911    fn split_requests_by_column_location<'a>(
912        &self,
913        table_name: &str,
914        requests: &'a [AddColumnRequest],
915    ) -> Result<SplitResult<'a>> {
916        let table_schema = &self.schema;
917        let mut columns_at_first = Vec::new();
918        let mut columns_at_after = HashMap::new();
919        let mut columns_at_last = Vec::new();
920        let mut column_names = Vec::with_capacity(requests.len());
921        for request in requests {
922            // Check whether columns to add are already existing.
923            let column_name = &request.column_schema.name;
924            column_names.push(column_name.clone());
925            ensure!(
926                table_schema.column_schema_by_name(column_name).is_none(),
927                error::ColumnExistsSnafu {
928                    column_name,
929                    table_name,
930                }
931            );
932            match request.location.as_ref() {
933                Some(AddColumnLocation::First) => {
934                    columns_at_first.push(request);
935                }
936                Some(AddColumnLocation::After { column_name }) => {
937                    ensure!(
938                        table_schema.column_schema_by_name(column_name).is_some(),
939                        error::ColumnNotExistsSnafu {
940                            column_name,
941                            table_name,
942                        }
943                    );
944                    columns_at_after
945                        .entry(column_name.clone())
946                        .or_insert(Vec::new())
947                        .push(request);
948                }
949                None => {
950                    columns_at_last.push(request);
951                }
952            }
953        }
954        Ok(SplitResult {
955            columns_at_first,
956            columns_at_after,
957            columns_at_last,
958            column_names,
959        })
960    }
961
962    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
963        let table_schema = &self.schema;
964        let mut meta_builder = self.new_meta_builder();
965        let mut columns = Vec::with_capacity(table_schema.num_columns());
966        for column_schema in table_schema.column_schemas() {
967            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
968                // Drop default constraint.
969                ensure!(
970                    column_schema.default_constraint().is_some(),
971                    error::InvalidAlterRequestSnafu {
972                        table: table_name,
973                        err: format!("column {name} does not have a default value"),
974                    }
975                );
976                if !column_schema.is_nullable() {
977                    return error::InvalidAlterRequestSnafu {
978                        table: table_name,
979                        err: format!(
980                            "column {name} is not nullable and `default` cannot be dropped",
981                        ),
982                    }
983                    .fail();
984                }
985                let new_column_schema = column_schema.clone();
986                let new_column_schema = new_column_schema
987                    .with_default_constraint(None)
988                    .with_context(|_| error::SchemaBuildSnafu {
989                        msg: format!("Table {table_name} cannot drop default values"),
990                    })?;
991                columns.push(new_column_schema);
992            } else {
993                columns.push(column_schema.clone());
994            }
995        }
996
997        let mut builder = SchemaBuilder::try_from_columns(columns)
998            .with_context(|_| error::SchemaBuildSnafu {
999                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1000            })?
1001            // Also bump the schema version.
1002            .version(table_schema.version() + 1);
1003        for (k, v) in table_schema.metadata().iter() {
1004            builder = builder.add_metadata(k, v);
1005        }
1006        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1007            msg: format!("Table {table_name} cannot drop default values"),
1008        })?;
1009
1010        let _ = meta_builder.schema(Arc::new(new_schema));
1011
1012        Ok(meta_builder)
1013    }
1014}
1015#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1016#[builder(pattern = "owned")]
1017pub struct TableInfo {
1018    /// Id and version of the table.
1019    #[builder(default, setter(into))]
1020    pub ident: TableIdent,
1021    /// Name of the table.
1022    #[builder(setter(into))]
1023    pub name: String,
1024    /// Comment of the table.
1025    #[builder(default, setter(into))]
1026    pub desc: Option<String>,
1027    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1028    pub catalog_name: String,
1029    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1030    pub schema_name: String,
1031    pub meta: TableMeta,
1032    #[builder(default = "TableType::Base")]
1033    pub table_type: TableType,
1034}
1035
1036pub type TableInfoRef = Arc<TableInfo>;
1037
1038impl TableInfo {
1039    pub fn table_id(&self) -> TableId {
1040        self.ident.table_id
1041    }
1042
1043    pub fn region_ids(&self) -> Vec<RegionId> {
1044        self.meta
1045            .region_numbers
1046            .iter()
1047            .map(|id| RegionId::new(self.table_id(), *id))
1048            .collect()
1049    }
1050    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1051    pub fn full_table_name(&self) -> String {
1052        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1053    }
1054
1055    pub fn get_db_string(&self) -> String {
1056        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1057    }
1058
1059    /// Returns true when the table is the metric engine's physical table.
1060    pub fn is_physical_table(&self) -> bool {
1061        self.meta
1062            .options
1063            .extra_options
1064            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1065    }
1066
1067    /// Return true if the table's TTL is `instant`.
1068    pub fn is_ttl_instant_table(&self) -> bool {
1069        self.meta
1070            .options
1071            .ttl
1072            .map(|t| t.is_instant())
1073            .unwrap_or(false)
1074    }
1075}
1076
1077impl TableInfoBuilder {
1078    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1079        Self {
1080            name: Some(name.into()),
1081            meta: Some(meta),
1082            ..Default::default()
1083        }
1084    }
1085
1086    pub fn table_id(mut self, id: TableId) -> Self {
1087        let ident = self.ident.get_or_insert_with(TableIdent::default);
1088        ident.table_id = id;
1089        self
1090    }
1091
1092    pub fn table_version(mut self, version: TableVersion) -> Self {
1093        let ident = self.ident.get_or_insert_with(TableIdent::default);
1094        ident.version = version;
1095        self
1096    }
1097}
1098
1099impl TableIdent {
1100    pub fn new(table_id: TableId) -> Self {
1101        Self {
1102            table_id,
1103            version: 0,
1104        }
1105    }
1106}
1107
1108impl From<TableId> for TableIdent {
1109    fn from(table_id: TableId) -> Self {
1110        Self::new(table_id)
1111    }
1112}
1113
1114/// Struct used to serialize and deserialize [`TableMeta`].
1115#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1116pub struct RawTableMeta {
1117    pub schema: RawSchema,
1118    /// The indices of columns in primary key. Note that the index of timestamp column
1119    /// is not included. Order matters to this array.
1120    pub primary_key_indices: Vec<usize>,
1121    ///  The indices of columns in value. The index of timestamp column is included.
1122    /// Order doesn't matter to this array.
1123    pub value_indices: Vec<usize>,
1124    /// Engine type of this table. Usually in small case.
1125    pub engine: String,
1126    /// Next column id of a new column.
1127    /// It's used to ensure all columns with the same name across all regions have the same column id.
1128    pub next_column_id: ColumnId,
1129    pub region_numbers: Vec<u32>,
1130    pub options: TableOptions,
1131    pub created_on: DateTime<Utc>,
1132    /// Order doesn't matter to this array.
1133    #[serde(default)]
1134    pub partition_key_indices: Vec<usize>,
1135    /// Map of column name to column id.
1136    /// Note: This field may be empty for older versions that did not include this field.
1137    #[serde(default)]
1138    pub column_ids: Vec<ColumnId>,
1139}
1140
1141impl From<TableMeta> for RawTableMeta {
1142    fn from(meta: TableMeta) -> RawTableMeta {
1143        RawTableMeta {
1144            schema: RawSchema::from(&*meta.schema),
1145            primary_key_indices: meta.primary_key_indices,
1146            value_indices: meta.value_indices,
1147            engine: meta.engine,
1148            next_column_id: meta.next_column_id,
1149            region_numbers: meta.region_numbers,
1150            options: meta.options,
1151            created_on: meta.created_on,
1152            partition_key_indices: meta.partition_key_indices,
1153            column_ids: meta.column_ids,
1154        }
1155    }
1156}
1157
1158impl TryFrom<RawTableMeta> for TableMeta {
1159    type Error = ConvertError;
1160
1161    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1162        Ok(TableMeta {
1163            schema: Arc::new(Schema::try_from(raw.schema)?),
1164            primary_key_indices: raw.primary_key_indices,
1165            value_indices: raw.value_indices,
1166            engine: raw.engine,
1167            region_numbers: raw.region_numbers,
1168            next_column_id: raw.next_column_id,
1169            options: raw.options,
1170            created_on: raw.created_on,
1171            partition_key_indices: raw.partition_key_indices,
1172            column_ids: raw.column_ids,
1173        })
1174    }
1175}
1176
1177/// Struct used to serialize and deserialize [`TableInfo`].
1178#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1179pub struct RawTableInfo {
1180    pub ident: TableIdent,
1181    pub name: String,
1182    pub desc: Option<String>,
1183    pub catalog_name: String,
1184    pub schema_name: String,
1185    pub meta: RawTableMeta,
1186    pub table_type: TableType,
1187}
1188
1189impl RawTableInfo {
1190    /// Returns the map of column name to column id.
1191    ///
1192    /// Note: This method may return an empty map for older versions that did not include this field.
1193    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1194        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1195            None
1196        } else {
1197            Some(
1198                self.meta
1199                    .column_ids
1200                    .iter()
1201                    .enumerate()
1202                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1203                    .collect(),
1204            )
1205        }
1206    }
1207
1208    /// Sort the columns in [RawTableInfo], logical tables require it.
1209    pub fn sort_columns(&mut self) {
1210        let column_schemas = &self.meta.schema.column_schemas;
1211        let primary_keys = self
1212            .meta
1213            .primary_key_indices
1214            .iter()
1215            .map(|index| column_schemas[*index].name.clone())
1216            .collect::<HashSet<_>>();
1217
1218        let name_to_ids = self.name_to_ids().unwrap_or_default();
1219        self.meta
1220            .schema
1221            .column_schemas
1222            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1223
1224        // Compute new indices of sorted columns
1225        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1226        let mut timestamp_index = None;
1227        let mut value_indices =
1228            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1229        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1230        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1231            if primary_keys.contains(&column_schema.name) {
1232                primary_key_indices.push(index);
1233            } else if column_schema.is_time_index() {
1234                value_indices.push(index);
1235                timestamp_index = Some(index);
1236            } else {
1237                value_indices.push(index);
1238            }
1239            if let Some(id) = name_to_ids.get(&column_schema.name) {
1240                column_ids.push(*id);
1241            }
1242        }
1243
1244        // Overwrite table meta
1245        self.meta.schema.timestamp_index = timestamp_index;
1246        self.meta.primary_key_indices = primary_key_indices;
1247        self.meta.value_indices = value_indices;
1248        self.meta.column_ids = column_ids;
1249    }
1250
1251    /// Extracts region options from table info.
1252    ///
1253    /// All "region options" are actually a copy of table options for redundancy.
1254    pub fn to_region_options(&self) -> HashMap<String, String> {
1255        HashMap::from(&self.meta.options)
1256    }
1257
1258    /// Returns the table reference.
1259    pub fn table_ref(&self) -> TableReference {
1260        TableReference::full(
1261            self.catalog_name.as_str(),
1262            self.schema_name.as_str(),
1263            self.name.as_str(),
1264        )
1265    }
1266}
1267
1268impl From<TableInfo> for RawTableInfo {
1269    fn from(info: TableInfo) -> RawTableInfo {
1270        RawTableInfo {
1271            ident: info.ident,
1272            name: info.name,
1273            desc: info.desc,
1274            catalog_name: info.catalog_name,
1275            schema_name: info.schema_name,
1276            meta: RawTableMeta::from(info.meta),
1277            table_type: info.table_type,
1278        }
1279    }
1280}
1281
1282impl TryFrom<RawTableInfo> for TableInfo {
1283    type Error = ConvertError;
1284
1285    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1286        Ok(TableInfo {
1287            ident: raw.ident,
1288            name: raw.name,
1289            desc: raw.desc,
1290            catalog_name: raw.catalog_name,
1291            schema_name: raw.schema_name,
1292            meta: TableMeta::try_from(raw.meta)?,
1293            table_type: raw.table_type,
1294        })
1295    }
1296}
1297
1298/// Set column fulltext options if it passed the validation.
1299///
1300/// Options allowed to modify:
1301/// * backend
1302///
1303/// Options not allowed to modify:
1304/// * analyzer
1305/// * case_sensitive
1306fn set_column_fulltext_options(
1307    column_schema: &mut ColumnSchema,
1308    column_name: &str,
1309    options: &FulltextOptions,
1310    current_options: Option<FulltextOptions>,
1311) -> Result<()> {
1312    if let Some(current_options) = current_options {
1313        ensure!(
1314            current_options.analyzer == options.analyzer
1315                && current_options.case_sensitive == options.case_sensitive,
1316            error::InvalidColumnOptionSnafu {
1317                column_name,
1318                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1319                current_options.analyzer, current_options.case_sensitive),
1320            }
1321        );
1322    }
1323
1324    column_schema
1325        .set_fulltext_options(options)
1326        .context(error::SetFulltextOptionsSnafu { column_name })?;
1327
1328    Ok(())
1329}
1330
1331fn unset_column_fulltext_options(
1332    column_schema: &mut ColumnSchema,
1333    column_name: &str,
1334    current_options: Option<FulltextOptions>,
1335) -> Result<()> {
1336    ensure!(
1337        current_options
1338            .as_ref()
1339            .is_some_and(|options| options.enable),
1340        error::InvalidColumnOptionSnafu {
1341            column_name,
1342            msg: "FULLTEXT index already disabled".to_string(),
1343        }
1344    );
1345
1346    let mut options = current_options.unwrap();
1347    options.enable = false;
1348    column_schema
1349        .set_fulltext_options(&options)
1350        .context(error::SetFulltextOptionsSnafu { column_name })?;
1351
1352    Ok(())
1353}
1354
1355fn set_column_skipping_index_options(
1356    column_schema: &mut ColumnSchema,
1357    column_name: &str,
1358    options: &SkippingIndexOptions,
1359) -> Result<()> {
1360    column_schema
1361        .set_skipping_options(options)
1362        .context(error::SetSkippingOptionsSnafu { column_name })?;
1363
1364    Ok(())
1365}
1366
1367fn unset_column_skipping_index_options(
1368    column_schema: &mut ColumnSchema,
1369    column_name: &str,
1370) -> Result<()> {
1371    column_schema
1372        .unset_skipping_options()
1373        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1374    Ok(())
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use std::assert_matches::assert_matches;
1380
1381    use common_error::ext::ErrorExt;
1382    use common_error::status_code::StatusCode;
1383    use datatypes::data_type::ConcreteDataType;
1384    use datatypes::schema::{
1385        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1386    };
1387
1388    use super::*;
1389    use crate::Error;
1390
1391    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1392    fn new_test_schema() -> Schema {
1393        let column_schemas = vec![
1394            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1395            ColumnSchema::new(
1396                "ts",
1397                ConcreteDataType::timestamp_millisecond_datatype(),
1398                false,
1399            )
1400            .with_time_index(true),
1401            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1402        ];
1403        SchemaBuilder::try_from(column_schemas)
1404            .unwrap()
1405            .version(123)
1406            .build()
1407            .unwrap()
1408    }
1409
1410    #[test]
1411    fn test_raw_convert() {
1412        let schema = Arc::new(new_test_schema());
1413        let meta = TableMetaBuilder::empty()
1414            .schema(schema)
1415            .primary_key_indices(vec![0])
1416            .engine("engine")
1417            .next_column_id(3)
1418            .build()
1419            .unwrap();
1420        let info = TableInfoBuilder::default()
1421            .table_id(10)
1422            .table_version(5)
1423            .name("mytable")
1424            .meta(meta)
1425            .build()
1426            .unwrap();
1427
1428        let raw = RawTableInfo::from(info.clone());
1429        let info_new = TableInfo::try_from(raw).unwrap();
1430
1431        assert_eq!(info, info_new);
1432    }
1433
1434    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1435        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1436        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1437        let alter_kind = AlterKind::AddColumns {
1438            columns: vec![
1439                AddColumnRequest {
1440                    column_schema: new_tag,
1441                    is_key: true,
1442                    location: None,
1443                    add_if_not_exists: false,
1444                },
1445                AddColumnRequest {
1446                    column_schema: new_field,
1447                    is_key: false,
1448                    location: None,
1449                    add_if_not_exists: false,
1450                },
1451            ],
1452        };
1453
1454        let builder = meta
1455            .builder_with_alter_kind("my_table", &alter_kind)
1456            .unwrap();
1457        builder.build().unwrap()
1458    }
1459
1460    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1461        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1462        let new_field = ColumnSchema::new(
1463            "my_field_after_ts",
1464            ConcreteDataType::string_datatype(),
1465            true,
1466        );
1467        let yet_another_field = ColumnSchema::new(
1468            "yet_another_field_after_ts",
1469            ConcreteDataType::int64_datatype(),
1470            true,
1471        );
1472        let alter_kind = AlterKind::AddColumns {
1473            columns: vec![
1474                AddColumnRequest {
1475                    column_schema: new_tag,
1476                    is_key: true,
1477                    location: Some(AddColumnLocation::First),
1478                    add_if_not_exists: false,
1479                },
1480                AddColumnRequest {
1481                    column_schema: new_field,
1482                    is_key: false,
1483                    location: Some(AddColumnLocation::After {
1484                        column_name: "ts".to_string(),
1485                    }),
1486                    add_if_not_exists: false,
1487                },
1488                AddColumnRequest {
1489                    column_schema: yet_another_field,
1490                    is_key: true,
1491                    location: Some(AddColumnLocation::After {
1492                        column_name: "ts".to_string(),
1493                    }),
1494                    add_if_not_exists: false,
1495                },
1496            ],
1497        };
1498
1499        let builder = meta
1500            .builder_with_alter_kind("my_table", &alter_kind)
1501            .unwrap();
1502        builder.build().unwrap()
1503    }
1504
1505    #[test]
1506    fn test_add_columns() {
1507        let schema = Arc::new(new_test_schema());
1508        let meta = TableMetaBuilder::empty()
1509            .schema(schema)
1510            .primary_key_indices(vec![0])
1511            .engine("engine")
1512            .next_column_id(3)
1513            .build()
1514            .unwrap();
1515
1516        let new_meta = add_columns_to_meta(&meta);
1517        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1518
1519        let names: Vec<String> = new_meta
1520            .schema
1521            .column_schemas()
1522            .iter()
1523            .map(|column_schema| column_schema.name.clone())
1524            .collect();
1525        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1526        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1527        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1528    }
1529
1530    #[test]
1531    fn test_add_columns_multiple_times() {
1532        let schema = Arc::new(new_test_schema());
1533        let meta = TableMetaBuilder::empty()
1534            .schema(schema)
1535            .primary_key_indices(vec![0])
1536            .engine("engine")
1537            .next_column_id(3)
1538            .build()
1539            .unwrap();
1540
1541        let alter_kind = AlterKind::AddColumns {
1542            columns: vec![
1543                AddColumnRequest {
1544                    column_schema: ColumnSchema::new(
1545                        "col3",
1546                        ConcreteDataType::int32_datatype(),
1547                        true,
1548                    ),
1549                    is_key: true,
1550                    location: None,
1551                    add_if_not_exists: true,
1552                },
1553                AddColumnRequest {
1554                    column_schema: ColumnSchema::new(
1555                        "col3",
1556                        ConcreteDataType::int32_datatype(),
1557                        true,
1558                    ),
1559                    is_key: true,
1560                    location: None,
1561                    add_if_not_exists: true,
1562                },
1563            ],
1564        };
1565        let err = meta
1566            .builder_with_alter_kind("my_table", &alter_kind)
1567            .err()
1568            .unwrap();
1569        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1570    }
1571
1572    #[test]
1573    fn test_remove_columns() {
1574        let schema = Arc::new(new_test_schema());
1575        let meta = TableMetaBuilder::empty()
1576            .schema(schema.clone())
1577            .primary_key_indices(vec![0])
1578            .engine("engine")
1579            .next_column_id(3)
1580            .build()
1581            .unwrap();
1582        // Add more columns so we have enough candidate columns to remove.
1583        let meta = add_columns_to_meta(&meta);
1584
1585        let alter_kind = AlterKind::DropColumns {
1586            names: vec![String::from("col2"), String::from("my_field")],
1587        };
1588        let new_meta = meta
1589            .builder_with_alter_kind("my_table", &alter_kind)
1590            .unwrap()
1591            .build()
1592            .unwrap();
1593
1594        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1595
1596        let names: Vec<String> = new_meta
1597            .schema
1598            .column_schemas()
1599            .iter()
1600            .map(|column_schema| column_schema.name.clone())
1601            .collect();
1602        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1603        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1604        assert_eq!(&[1], &new_meta.value_indices[..]);
1605        assert_eq!(
1606            schema.timestamp_column(),
1607            new_meta.schema.timestamp_column()
1608        );
1609    }
1610
1611    #[test]
1612    fn test_remove_multiple_columns_before_timestamp() {
1613        let column_schemas = vec![
1614            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1615            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1616            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1617            ColumnSchema::new(
1618                "ts",
1619                ConcreteDataType::timestamp_millisecond_datatype(),
1620                false,
1621            )
1622            .with_time_index(true),
1623        ];
1624        let schema = Arc::new(
1625            SchemaBuilder::try_from(column_schemas)
1626                .unwrap()
1627                .version(123)
1628                .build()
1629                .unwrap(),
1630        );
1631        let meta = TableMetaBuilder::empty()
1632            .schema(schema.clone())
1633            .primary_key_indices(vec![1])
1634            .engine("engine")
1635            .next_column_id(4)
1636            .build()
1637            .unwrap();
1638
1639        // Remove columns in reverse order to test whether timestamp index is valid.
1640        let alter_kind = AlterKind::DropColumns {
1641            names: vec![String::from("col3"), String::from("col1")],
1642        };
1643        let new_meta = meta
1644            .builder_with_alter_kind("my_table", &alter_kind)
1645            .unwrap()
1646            .build()
1647            .unwrap();
1648
1649        let names: Vec<String> = new_meta
1650            .schema
1651            .column_schemas()
1652            .iter()
1653            .map(|column_schema| column_schema.name.clone())
1654            .collect();
1655        assert_eq!(&["col2", "ts"], &names[..]);
1656        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1657        assert_eq!(&[1], &new_meta.value_indices[..]);
1658        assert_eq!(
1659            schema.timestamp_column(),
1660            new_meta.schema.timestamp_column()
1661        );
1662    }
1663
1664    #[test]
1665    fn test_add_existing_column() {
1666        let schema = Arc::new(new_test_schema());
1667        let meta = TableMetaBuilder::empty()
1668            .schema(schema)
1669            .primary_key_indices(vec![0])
1670            .engine("engine")
1671            .next_column_id(3)
1672            .build()
1673            .unwrap();
1674
1675        let alter_kind = AlterKind::AddColumns {
1676            columns: vec![AddColumnRequest {
1677                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1678                is_key: false,
1679                location: None,
1680                add_if_not_exists: false,
1681            }],
1682        };
1683
1684        let err = meta
1685            .builder_with_alter_kind("my_table", &alter_kind)
1686            .err()
1687            .unwrap();
1688        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1689
1690        // Add if not exists
1691        let alter_kind = AlterKind::AddColumns {
1692            columns: vec![AddColumnRequest {
1693                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1694                is_key: true,
1695                location: None,
1696                add_if_not_exists: true,
1697            }],
1698        };
1699        let new_meta = meta
1700            .builder_with_alter_kind("my_table", &alter_kind)
1701            .unwrap()
1702            .build()
1703            .unwrap();
1704        assert_eq!(
1705            meta.schema.column_schemas(),
1706            new_meta.schema.column_schemas()
1707        );
1708        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1709    }
1710
1711    #[test]
1712    fn test_add_different_type_column() {
1713        let schema = Arc::new(new_test_schema());
1714        let meta = TableMetaBuilder::empty()
1715            .schema(schema)
1716            .primary_key_indices(vec![0])
1717            .engine("engine")
1718            .next_column_id(3)
1719            .build()
1720            .unwrap();
1721
1722        // Add if not exists, but different type.
1723        let alter_kind = AlterKind::AddColumns {
1724            columns: vec![AddColumnRequest {
1725                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1726                is_key: false,
1727                location: None,
1728                add_if_not_exists: true,
1729            }],
1730        };
1731        let err = meta
1732            .builder_with_alter_kind("my_table", &alter_kind)
1733            .err()
1734            .unwrap();
1735        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1736    }
1737
1738    #[test]
1739    fn test_add_invalid_column() {
1740        let schema = Arc::new(new_test_schema());
1741        let meta = TableMetaBuilder::empty()
1742            .schema(schema)
1743            .primary_key_indices(vec![0])
1744            .engine("engine")
1745            .next_column_id(3)
1746            .build()
1747            .unwrap();
1748
1749        // Not nullable and no default value.
1750        let alter_kind = AlterKind::AddColumns {
1751            columns: vec![AddColumnRequest {
1752                column_schema: ColumnSchema::new(
1753                    "weny",
1754                    ConcreteDataType::string_datatype(),
1755                    false,
1756                ),
1757                is_key: false,
1758                location: None,
1759                add_if_not_exists: false,
1760            }],
1761        };
1762
1763        let err = meta
1764            .builder_with_alter_kind("my_table", &alter_kind)
1765            .err()
1766            .unwrap();
1767        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1768    }
1769
1770    #[test]
1771    fn test_remove_unknown_column() {
1772        let schema = Arc::new(new_test_schema());
1773        let meta = TableMetaBuilder::empty()
1774            .schema(schema)
1775            .primary_key_indices(vec![0])
1776            .engine("engine")
1777            .next_column_id(3)
1778            .build()
1779            .unwrap();
1780
1781        let alter_kind = AlterKind::DropColumns {
1782            names: vec![String::from("unknown")],
1783        };
1784
1785        let err = meta
1786            .builder_with_alter_kind("my_table", &alter_kind)
1787            .err()
1788            .unwrap();
1789        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1790    }
1791
1792    #[test]
1793    fn test_change_unknown_column_data_type() {
1794        let schema = Arc::new(new_test_schema());
1795        let meta = TableMetaBuilder::empty()
1796            .schema(schema)
1797            .primary_key_indices(vec![0])
1798            .engine("engine")
1799            .next_column_id(3)
1800            .build()
1801            .unwrap();
1802
1803        let alter_kind = AlterKind::ModifyColumnTypes {
1804            columns: vec![ModifyColumnTypeRequest {
1805                column_name: "unknown".to_string(),
1806                target_type: ConcreteDataType::string_datatype(),
1807            }],
1808        };
1809
1810        let err = meta
1811            .builder_with_alter_kind("my_table", &alter_kind)
1812            .err()
1813            .unwrap();
1814        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1815    }
1816
1817    #[test]
1818    fn test_remove_key_column() {
1819        let schema = Arc::new(new_test_schema());
1820        let meta = TableMetaBuilder::empty()
1821            .schema(schema)
1822            .primary_key_indices(vec![0])
1823            .engine("engine")
1824            .next_column_id(3)
1825            .build()
1826            .unwrap();
1827
1828        // Remove column in primary key.
1829        let alter_kind = AlterKind::DropColumns {
1830            names: vec![String::from("col1")],
1831        };
1832
1833        let err = meta
1834            .builder_with_alter_kind("my_table", &alter_kind)
1835            .err()
1836            .unwrap();
1837        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1838
1839        // Remove timestamp column.
1840        let alter_kind = AlterKind::DropColumns {
1841            names: vec![String::from("ts")],
1842        };
1843
1844        let err = meta
1845            .builder_with_alter_kind("my_table", &alter_kind)
1846            .err()
1847            .unwrap();
1848        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1849    }
1850
1851    #[test]
1852    fn test_remove_partition_column() {
1853        let schema = Arc::new(new_test_schema());
1854        let meta = TableMetaBuilder::empty()
1855            .schema(schema)
1856            .primary_key_indices(vec![])
1857            .partition_key_indices(vec![0])
1858            .engine("engine")
1859            .next_column_id(3)
1860            .build()
1861            .unwrap();
1862        // Remove column in primary key.
1863        let alter_kind = AlterKind::DropColumns {
1864            names: vec![String::from("col1")],
1865        };
1866
1867        let err = meta
1868            .builder_with_alter_kind("my_table", &alter_kind)
1869            .err()
1870            .unwrap();
1871        assert_matches!(err, Error::RemovePartitionColumn { .. });
1872    }
1873
1874    #[test]
1875    fn test_change_key_column_data_type() {
1876        let schema = Arc::new(new_test_schema());
1877        let meta = TableMetaBuilder::empty()
1878            .schema(schema)
1879            .primary_key_indices(vec![0])
1880            .engine("engine")
1881            .next_column_id(3)
1882            .build()
1883            .unwrap();
1884
1885        // Remove column in primary key.
1886        let alter_kind = AlterKind::ModifyColumnTypes {
1887            columns: vec![ModifyColumnTypeRequest {
1888                column_name: "col1".to_string(),
1889                target_type: ConcreteDataType::string_datatype(),
1890            }],
1891        };
1892
1893        let err = meta
1894            .builder_with_alter_kind("my_table", &alter_kind)
1895            .err()
1896            .unwrap();
1897        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1898
1899        // Remove timestamp column.
1900        let alter_kind = AlterKind::ModifyColumnTypes {
1901            columns: vec![ModifyColumnTypeRequest {
1902                column_name: "ts".to_string(),
1903                target_type: ConcreteDataType::string_datatype(),
1904            }],
1905        };
1906
1907        let err = meta
1908            .builder_with_alter_kind("my_table", &alter_kind)
1909            .err()
1910            .unwrap();
1911        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1912    }
1913
1914    #[test]
1915    fn test_alloc_new_column() {
1916        let schema = Arc::new(new_test_schema());
1917        let mut meta = TableMetaBuilder::empty()
1918            .schema(schema)
1919            .primary_key_indices(vec![0])
1920            .engine("engine")
1921            .next_column_id(3)
1922            .build()
1923            .unwrap();
1924        assert_eq!(3, meta.next_column_id);
1925
1926        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1927        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1928
1929        assert_eq!(4, meta.next_column_id);
1930        assert_eq!(column_schema.name, desc.name);
1931    }
1932
1933    #[test]
1934    fn test_add_columns_with_location() {
1935        let schema = Arc::new(new_test_schema());
1936        let meta = TableMetaBuilder::empty()
1937            .schema(schema)
1938            .primary_key_indices(vec![0])
1939            // partition col: col1, col2
1940            .partition_key_indices(vec![0, 2])
1941            .engine("engine")
1942            .next_column_id(3)
1943            .build()
1944            .unwrap();
1945
1946        let new_meta = add_columns_to_meta_with_location(&meta);
1947        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1948
1949        let names: Vec<String> = new_meta
1950            .schema
1951            .column_schemas()
1952            .iter()
1953            .map(|column_schema| column_schema.name.clone())
1954            .collect();
1955        assert_eq!(
1956            &[
1957                "my_tag_first",               // primary key column
1958                "col1",                       // partition column
1959                "ts",                         // timestamp column
1960                "yet_another_field_after_ts", // primary key column
1961                "my_field_after_ts",          // value column
1962                "col2",                       // partition column
1963            ],
1964            &names[..]
1965        );
1966        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
1967        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
1968        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
1969    }
1970
1971    #[test]
1972    fn test_modify_column_fulltext_options() {
1973        let schema = Arc::new(new_test_schema());
1974        let meta = TableMetaBuilder::empty()
1975            .schema(schema)
1976            .primary_key_indices(vec![0])
1977            .engine("engine")
1978            .next_column_id(3)
1979            .build()
1980            .unwrap();
1981
1982        let alter_kind = AlterKind::SetIndex {
1983            options: SetIndexOptions::Fulltext {
1984                column_name: "col1".to_string(),
1985                options: FulltextOptions::default(),
1986            },
1987        };
1988        let err = meta
1989            .builder_with_alter_kind("my_table", &alter_kind)
1990            .err()
1991            .unwrap();
1992        assert_eq!(
1993            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1994            err.to_string()
1995        );
1996
1997        // Add a string column and make it fulltext indexed
1998        let new_meta = add_columns_to_meta_with_location(&meta);
1999        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2000
2001        let alter_kind = AlterKind::SetIndex {
2002            options: SetIndexOptions::Fulltext {
2003                column_name: "my_tag_first".to_string(),
2004                options: FulltextOptions::new_unchecked(
2005                    true,
2006                    FulltextAnalyzer::Chinese,
2007                    true,
2008                    FulltextBackend::Bloom,
2009                    1000,
2010                    0.01,
2011                ),
2012            },
2013        };
2014        let new_meta = new_meta
2015            .builder_with_alter_kind("my_table", &alter_kind)
2016            .unwrap()
2017            .build()
2018            .unwrap();
2019        let column_schema = new_meta
2020            .schema
2021            .column_schema_by_name("my_tag_first")
2022            .unwrap();
2023        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2024        assert!(fulltext_options.enable);
2025        assert_eq!(
2026            datatypes::schema::FulltextAnalyzer::Chinese,
2027            fulltext_options.analyzer
2028        );
2029        assert!(fulltext_options.case_sensitive);
2030
2031        let alter_kind = AlterKind::UnsetIndex {
2032            options: UnsetIndexOptions::Fulltext {
2033                column_name: "my_tag_first".to_string(),
2034            },
2035        };
2036        let new_meta = new_meta
2037            .builder_with_alter_kind("my_table", &alter_kind)
2038            .unwrap()
2039            .build()
2040            .unwrap();
2041        let column_schema = new_meta
2042            .schema
2043            .column_schema_by_name("my_tag_first")
2044            .unwrap();
2045        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2046        assert!(!fulltext_options.enable);
2047    }
2048}