table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105    fn from(t: TableType) -> datafusion::datasource::TableType {
106        match t {
107            TableType::Base => datafusion::datasource::TableType::Base,
108            TableType::View => datafusion::datasource::TableType::View,
109            TableType::Temporary => datafusion::datasource::TableType::Temporary,
110        }
111    }
112}
113
114/// Identifier of the table.
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117    /// Unique id of this table.
118    pub table_id: TableId,
119    /// Version of the table, bumped when metadata (such as schema) of the table
120    /// being changed.
121    pub version: TableVersion,
122}
123
124/// The table metadata.
125///
126/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
127#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130    pub schema: SchemaRef,
131    /// The indices of columns in primary key. Note that the index of timestamp column
132    /// is not included in these indices.
133    pub primary_key_indices: Vec<usize>,
134    #[builder(default = "self.default_value_indices()?")]
135    pub value_indices: Vec<usize>,
136    #[builder(default, setter(into))]
137    pub engine: String,
138    #[builder(default, setter(into))]
139    pub region_numbers: Vec<u32>,
140    pub next_column_id: ColumnId,
141    /// Table options.
142    #[builder(default)]
143    pub options: TableOptions,
144    #[builder(default = "Utc::now()")]
145    pub created_on: DateTime<Utc>,
146    #[builder(default = "self.default_updated_on()")]
147    pub updated_on: DateTime<Utc>,
148    #[builder(default = "Vec::new()")]
149    pub partition_key_indices: Vec<usize>,
150    #[builder(default = "Vec::new()")]
151    pub column_ids: Vec<ColumnId>,
152}
153
154impl TableMetaBuilder {
155    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
156    #[cfg(any(test, feature = "testing"))]
157    pub fn empty() -> Self {
158        Self {
159            schema: None,
160            primary_key_indices: None,
161            value_indices: None,
162            engine: None,
163            region_numbers: None,
164            next_column_id: None,
165            options: None,
166            created_on: None,
167            updated_on: None,
168            partition_key_indices: None,
169            column_ids: None,
170        }
171    }
172}
173
174impl TableMetaBuilder {
175    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
176        match (&self.primary_key_indices, &self.schema) {
177            (Some(v), Some(schema)) => {
178                let column_schemas = schema.column_schemas();
179                Ok((0..column_schemas.len())
180                    .filter(|idx| !v.contains(idx))
181                    .collect())
182            }
183            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
184        }
185    }
186
187    fn default_updated_on(&self) -> DateTime<Utc> {
188        self.created_on.unwrap_or_default()
189    }
190
191    pub fn new_external_table() -> Self {
192        Self {
193            schema: None,
194            primary_key_indices: Some(Vec::new()),
195            value_indices: Some(Vec::new()),
196            engine: None,
197            region_numbers: Some(Vec::new()),
198            next_column_id: Some(0),
199            options: None,
200            created_on: None,
201            updated_on: None,
202            partition_key_indices: None,
203            column_ids: None,
204        }
205    }
206}
207
208/// The result after splitting requests by column location info.
209struct SplitResult<'a> {
210    /// column requests should be added at first place.
211    columns_at_first: Vec<&'a AddColumnRequest>,
212    /// column requests should be added after already exist columns.
213    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
214    /// column requests should be added at last place.
215    columns_at_last: Vec<&'a AddColumnRequest>,
216    /// all column names should be added.
217    column_names: Vec<String>,
218}
219
220impl TableMeta {
221    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
222        let columns_schemas = &self.schema.column_schemas();
223        self.primary_key_indices
224            .iter()
225            .map(|idx| &columns_schemas[*idx].name)
226    }
227
228    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
229        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
230        let columns_schemas = self.schema.column_schemas();
231        let primary_key_indices = &self.primary_key_indices;
232        columns_schemas
233            .iter()
234            .enumerate()
235            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
236            .map(|(_, cs)| &cs.name)
237    }
238
239    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
240        let columns_schemas = &self.schema.column_schemas();
241        self.partition_key_indices
242            .iter()
243            .map(|idx| &columns_schemas[*idx].name)
244    }
245
246    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
247    ///
248    /// The returned builder would derive the next column id of this meta.
249    pub fn builder_with_alter_kind(
250        &self,
251        table_name: &str,
252        alter_kind: &AlterKind,
253    ) -> Result<TableMetaBuilder> {
254        let mut builder = match alter_kind {
255            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
256            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
257            AlterKind::ModifyColumnTypes { columns } => {
258                self.modify_column_types(table_name, columns)
259            }
260            // No need to rebuild table meta when renaming tables.
261            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
262            AlterKind::SetTableOptions { options } => self.set_table_options(options),
263            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
264            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
265            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
266            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
267            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
268        }?;
269        let _ = builder.updated_on(Utc::now());
270        Ok(builder)
271    }
272
273    /// Creates a [TableMetaBuilder] with modified table options.
274    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
275        let mut new_options = self.options.clone();
276
277        for request in requests {
278            match request {
279                SetRegionOption::Ttl(new_ttl) => {
280                    new_options.ttl = *new_ttl;
281                }
282                SetRegionOption::Twsc(key, value) => {
283                    if !value.is_empty() {
284                        new_options.extra_options.insert(key.clone(), value.clone());
285                        // Ensure node restart correctly.
286                        new_options.extra_options.insert(
287                            COMPACTION_TYPE.to_string(),
288                            COMPACTION_TYPE_TWCS.to_string(),
289                        );
290                    } else {
291                        // Invalidate the previous change option if an empty value has been set.
292                        new_options.extra_options.remove(key.as_str());
293                    }
294                }
295            }
296        }
297        let mut builder = self.new_meta_builder();
298        builder.options(new_options);
299
300        Ok(builder)
301    }
302
303    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
304        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
305        self.set_table_options(&requests)
306    }
307
308    fn set_indexes(
309        &self,
310        table_name: &str,
311        requests: &[SetIndexOption],
312    ) -> Result<TableMetaBuilder> {
313        let table_schema = &self.schema;
314        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
315        for request in requests {
316            let column_name = request.column_name();
317            table_schema
318                .column_index_by_name(column_name)
319                .with_context(|| error::ColumnNotExistsSnafu {
320                    column_name,
321                    table_name,
322                })?;
323            set_index_options
324                .entry(column_name)
325                .or_default()
326                .push(request);
327        }
328
329        let mut meta_builder = self.new_meta_builder();
330        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
331        for mut column in table_schema.column_schemas().iter().cloned() {
332            if let Some(request) = set_index_options.get(column.name.as_str()) {
333                for request in request {
334                    self.set_index(&mut column, request)?;
335                }
336            }
337            columns.push(column);
338        }
339
340        let mut builder = SchemaBuilder::try_from_columns(columns)
341            .with_context(|_| error::SchemaBuildSnafu {
342                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
343            })?
344            .version(table_schema.version() + 1);
345
346        for (k, v) in table_schema.metadata().iter() {
347            builder = builder.add_metadata(k, v);
348        }
349
350        let new_schema = builder.build().with_context(|_| {
351            let column_names = requests
352                .iter()
353                .map(|request| request.column_name())
354                .collect::<Vec<_>>();
355            error::SchemaBuildSnafu {
356                msg: format!(
357                    "Table {table_name} cannot set index options with columns {column_names:?}",
358                ),
359            }
360        })?;
361        let _ = meta_builder
362            .schema(Arc::new(new_schema))
363            .primary_key_indices(self.primary_key_indices.clone());
364
365        Ok(meta_builder)
366    }
367
368    fn unset_indexes(
369        &self,
370        table_name: &str,
371        requests: &[UnsetIndexOption],
372    ) -> Result<TableMetaBuilder> {
373        let table_schema = &self.schema;
374        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
375        for request in requests {
376            let column_name = request.column_name();
377            table_schema
378                .column_index_by_name(column_name)
379                .with_context(|| error::ColumnNotExistsSnafu {
380                    column_name,
381                    table_name,
382                })?;
383            set_index_options
384                .entry(column_name)
385                .or_default()
386                .push(request);
387        }
388
389        let mut meta_builder = self.new_meta_builder();
390        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
391        for mut column in table_schema.column_schemas().iter().cloned() {
392            if let Some(request) = set_index_options.get(column.name.as_str()) {
393                for request in request {
394                    self.unset_index(&mut column, request)?;
395                }
396            }
397            columns.push(column);
398        }
399
400        let mut builder = SchemaBuilder::try_from_columns(columns)
401            .with_context(|_| error::SchemaBuildSnafu {
402                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
403            })?
404            .version(table_schema.version() + 1);
405
406        for (k, v) in table_schema.metadata().iter() {
407            builder = builder.add_metadata(k, v);
408        }
409
410        let new_schema = builder.build().with_context(|_| {
411            let column_names = requests
412                .iter()
413                .map(|request| request.column_name())
414                .collect::<Vec<_>>();
415            error::SchemaBuildSnafu {
416                msg: format!(
417                    "Table {table_name} cannot set index options with columns {column_names:?}",
418                ),
419            }
420        })?;
421        let _ = meta_builder
422            .schema(Arc::new(new_schema))
423            .primary_key_indices(self.primary_key_indices.clone());
424
425        Ok(meta_builder)
426    }
427
428    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
429        match request {
430            SetIndexOption::Fulltext {
431                column_name,
432                options,
433            } => {
434                ensure!(
435                    column_schema.data_type.is_string(),
436                    error::InvalidColumnOptionSnafu {
437                        column_name,
438                        msg: "FULLTEXT index only supports string type",
439                    }
440                );
441
442                let current_fulltext_options = column_schema
443                    .fulltext_options()
444                    .context(error::SetFulltextOptionsSnafu { column_name })?;
445                set_column_fulltext_options(
446                    column_schema,
447                    column_name,
448                    options,
449                    current_fulltext_options,
450                )?;
451            }
452            SetIndexOption::Inverted { column_name } => {
453                debug_assert_eq!(column_schema.name, *column_name);
454                column_schema.set_inverted_index(true);
455            }
456            SetIndexOption::Skipping {
457                column_name,
458                options,
459            } => {
460                set_column_skipping_index_options(column_schema, column_name, options)?;
461            }
462        }
463
464        Ok(())
465    }
466
467    fn unset_index(
468        &self,
469        column_schema: &mut ColumnSchema,
470        request: &UnsetIndexOption,
471    ) -> Result<()> {
472        match request {
473            UnsetIndexOption::Fulltext { column_name } => {
474                let current_fulltext_options = column_schema
475                    .fulltext_options()
476                    .context(error::SetFulltextOptionsSnafu { column_name })?;
477                unset_column_fulltext_options(
478                    column_schema,
479                    column_name,
480                    current_fulltext_options.clone(),
481                )?
482            }
483            UnsetIndexOption::Inverted { .. } => {
484                column_schema.set_inverted_index(false);
485            }
486            UnsetIndexOption::Skipping { column_name } => {
487                unset_column_skipping_index_options(column_schema, column_name)?;
488            }
489        }
490
491        Ok(())
492    }
493
494    // TODO(yingwen): Remove this.
495    /// Allocate a new column for the table.
496    ///
497    /// This method would bump the `next_column_id` of the meta.
498    pub fn alloc_new_column(
499        &mut self,
500        table_name: &str,
501        new_column: &ColumnSchema,
502    ) -> Result<ColumnDescriptor> {
503        let desc = ColumnDescriptorBuilder::new(
504            self.next_column_id as ColumnId,
505            &new_column.name,
506            new_column.data_type.clone(),
507        )
508        .is_nullable(new_column.is_nullable())
509        .default_constraint(new_column.default_constraint().cloned())
510        .build()
511        .context(error::BuildColumnDescriptorSnafu {
512            table_name,
513            column_name: &new_column.name,
514        })?;
515
516        // Bump next column id.
517        self.next_column_id += 1;
518
519        Ok(desc)
520    }
521
522    /// Create a [`TableMetaBuilder`] from the current TableMeta.
523    fn new_meta_builder(&self) -> TableMetaBuilder {
524        let mut builder = TableMetaBuilder::from(self);
525        // Manually remove value_indices.
526        builder.value_indices = None;
527        builder
528    }
529
530    // TODO(yingwen): Tests add if not exists.
531    fn add_columns(
532        &self,
533        table_name: &str,
534        requests: &[AddColumnRequest],
535    ) -> Result<TableMetaBuilder> {
536        let table_schema = &self.schema;
537        let mut meta_builder = self.new_meta_builder();
538        let original_primary_key_indices: HashSet<&usize> =
539            self.primary_key_indices.iter().collect();
540
541        let mut names = HashSet::with_capacity(requests.len());
542        let mut new_columns = Vec::with_capacity(requests.len());
543        for col_to_add in requests {
544            if let Some(column_schema) =
545                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
546            {
547                // If the column already exists.
548                ensure!(
549                    col_to_add.add_if_not_exists,
550                    error::ColumnExistsSnafu {
551                        table_name,
552                        column_name: &col_to_add.column_schema.name
553                    },
554                );
555
556                // Checks if the type is the same
557                ensure!(
558                    column_schema.data_type == col_to_add.column_schema.data_type,
559                    error::InvalidAlterRequestSnafu {
560                        table: table_name,
561                        err: format!(
562                            "column {} already exists with different type {:?}",
563                            col_to_add.column_schema.name, column_schema.data_type,
564                        ),
565                    }
566                );
567            } else {
568                // A new column.
569                // Ensures we only add a column once.
570                ensure!(
571                    names.insert(&col_to_add.column_schema.name),
572                    error::InvalidAlterRequestSnafu {
573                        table: table_name,
574                        err: format!(
575                            "add column {} more than once",
576                            col_to_add.column_schema.name
577                        ),
578                    }
579                );
580
581                ensure!(
582                    col_to_add.column_schema.is_nullable()
583                        || col_to_add.column_schema.default_constraint().is_some(),
584                    error::InvalidAlterRequestSnafu {
585                        table: table_name,
586                        err: format!(
587                            "no default value for column {}",
588                            col_to_add.column_schema.name
589                        ),
590                    },
591                );
592
593                new_columns.push(col_to_add.clone());
594            }
595        }
596        let requests = &new_columns[..];
597
598        let SplitResult {
599            columns_at_first,
600            columns_at_after,
601            columns_at_last,
602            column_names,
603        } = self.split_requests_by_column_location(table_name, requests)?;
604        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
605        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
606        // add new columns with FIRST, and in reverse order of requests.
607        columns_at_first.iter().rev().for_each(|request| {
608            if request.is_key {
609                // If a key column is added, we also need to store its index in primary_key_indices.
610                primary_key_indices.push(columns.len());
611            }
612            columns.push(request.column_schema.clone());
613        });
614        // add existed columns in original order and handle new columns with AFTER.
615        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
616            if original_primary_key_indices.contains(&index) {
617                primary_key_indices.push(columns.len());
618            }
619            columns.push(column_schema.clone());
620            if let Some(requests) = columns_at_after.get(&column_schema.name) {
621                requests.iter().rev().for_each(|request| {
622                    if request.is_key {
623                        // If a key column is added, we also need to store its index in primary_key_indices.
624                        primary_key_indices.push(columns.len());
625                    }
626                    columns.push(request.column_schema.clone());
627                });
628            }
629        }
630        // add new columns without location info to last.
631        columns_at_last.iter().for_each(|request| {
632            if request.is_key {
633                // If a key column is added, we also need to store its index in primary_key_indices.
634                primary_key_indices.push(columns.len());
635            }
636            columns.push(request.column_schema.clone());
637        });
638
639        let mut builder = SchemaBuilder::try_from(columns)
640            .with_context(|_| error::SchemaBuildSnafu {
641                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
642            })?
643            // Also bump the schema version.
644            .version(table_schema.version() + 1);
645        for (k, v) in table_schema.metadata().iter() {
646            builder = builder.add_metadata(k, v);
647        }
648        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
649            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
650        })?;
651
652        let partition_key_indices = self
653            .partition_key_indices
654            .iter()
655            .map(|idx| table_schema.column_name_by_index(*idx))
656            // This unwrap is safe since we only add new columns.
657            .map(|name| new_schema.column_index_by_name(name).unwrap())
658            .collect();
659
660        // value_indices would be generated automatically.
661        let _ = meta_builder
662            .schema(Arc::new(new_schema))
663            .primary_key_indices(primary_key_indices)
664            .partition_key_indices(partition_key_indices);
665
666        Ok(meta_builder)
667    }
668
669    fn remove_columns(
670        &self,
671        table_name: &str,
672        column_names: &[String],
673    ) -> Result<TableMetaBuilder> {
674        let table_schema = &self.schema;
675        let column_names: HashSet<_> = column_names.iter().collect();
676        let mut meta_builder = self.new_meta_builder();
677
678        let timestamp_index = table_schema.timestamp_index();
679        // Check whether columns are existing and not in primary key index.
680        for column_name in &column_names {
681            if let Some(index) = table_schema.column_index_by_name(column_name) {
682                // This is a linear search, but since there won't be too much columns, the performance should
683                // be acceptable.
684                ensure!(
685                    !self.primary_key_indices.contains(&index),
686                    error::RemoveColumnInIndexSnafu {
687                        column_name: *column_name,
688                        table_name,
689                    }
690                );
691
692                ensure!(
693                    !self.partition_key_indices.contains(&index),
694                    error::RemovePartitionColumnSnafu {
695                        column_name: *column_name,
696                        table_name,
697                    }
698                );
699
700                if let Some(ts_index) = timestamp_index {
701                    // Not allowed to remove column in timestamp index.
702                    ensure!(
703                        index != ts_index,
704                        error::RemoveColumnInIndexSnafu {
705                            column_name: table_schema.column_name_by_index(ts_index),
706                            table_name,
707                        }
708                    );
709                }
710            } else {
711                return error::ColumnNotExistsSnafu {
712                    column_name: *column_name,
713                    table_name,
714                }
715                .fail()?;
716            }
717        }
718
719        // Collect columns after removal.
720        let columns: Vec<_> = table_schema
721            .column_schemas()
722            .iter()
723            .filter(|column_schema| !column_names.contains(&column_schema.name))
724            .cloned()
725            .collect();
726
727        let mut builder = SchemaBuilder::try_from_columns(columns)
728            .with_context(|_| error::SchemaBuildSnafu {
729                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
730            })?
731            // Also bump the schema version.
732            .version(table_schema.version() + 1);
733        for (k, v) in table_schema.metadata().iter() {
734            builder = builder.add_metadata(k, v);
735        }
736        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
737            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
738        })?;
739
740        // Rebuild the indices of primary key columns.
741        let primary_key_indices = self
742            .primary_key_indices
743            .iter()
744            .map(|idx| table_schema.column_name_by_index(*idx))
745            // This unwrap is safe since we don't allow removing a primary key column.
746            .map(|name| new_schema.column_index_by_name(name).unwrap())
747            .collect();
748
749        let partition_key_indices = self
750            .partition_key_indices
751            .iter()
752            .map(|idx| table_schema.column_name_by_index(*idx))
753            // This unwrap is safe since we don't allow removing a partition key column.
754            .map(|name| new_schema.column_index_by_name(name).unwrap())
755            .collect();
756
757        let _ = meta_builder
758            .schema(Arc::new(new_schema))
759            .primary_key_indices(primary_key_indices)
760            .partition_key_indices(partition_key_indices);
761
762        Ok(meta_builder)
763    }
764
765    fn modify_column_types(
766        &self,
767        table_name: &str,
768        requests: &[ModifyColumnTypeRequest],
769    ) -> Result<TableMetaBuilder> {
770        let table_schema = &self.schema;
771        let mut meta_builder = self.new_meta_builder();
772
773        let mut modify_column_types = HashMap::with_capacity(requests.len());
774        let timestamp_index = table_schema.timestamp_index();
775
776        for col_to_change in requests {
777            let change_column_name = &col_to_change.column_name;
778
779            let index = table_schema
780                .column_index_by_name(change_column_name)
781                .with_context(|| error::ColumnNotExistsSnafu {
782                    column_name: change_column_name,
783                    table_name,
784                })?;
785
786            let column = &table_schema.column_schemas()[index];
787
788            ensure!(
789                !self.primary_key_indices.contains(&index),
790                error::InvalidAlterRequestSnafu {
791                    table: table_name,
792                    err: format!(
793                        "Not allowed to change primary key index column '{}'",
794                        column.name
795                    )
796                }
797            );
798
799            if let Some(ts_index) = timestamp_index {
800                // Not allowed to change column datatype in timestamp index.
801                ensure!(
802                    index != ts_index,
803                    error::InvalidAlterRequestSnafu {
804                        table: table_name,
805                        err: format!(
806                            "Not allowed to change timestamp index column '{}' datatype",
807                            column.name
808                        )
809                    }
810                );
811            }
812
813            ensure!(
814                modify_column_types
815                    .insert(&col_to_change.column_name, col_to_change)
816                    .is_none(),
817                error::InvalidAlterRequestSnafu {
818                    table: table_name,
819                    err: format!(
820                        "change column datatype {} more than once",
821                        col_to_change.column_name
822                    ),
823                }
824            );
825
826            ensure!(
827                column
828                    .data_type
829                    .can_arrow_type_cast_to(&col_to_change.target_type),
830                error::InvalidAlterRequestSnafu {
831                    table: table_name,
832                    err: format!(
833                        "column '{}' cannot be cast automatically to type '{}'",
834                        col_to_change.column_name, col_to_change.target_type,
835                    ),
836                }
837            );
838
839            ensure!(
840                column.is_nullable(),
841                error::InvalidAlterRequestSnafu {
842                    table: table_name,
843                    err: format!(
844                        "column '{}' must be nullable to ensure safe conversion.",
845                        col_to_change.column_name,
846                    ),
847                }
848            );
849        }
850        // Collect columns after changed.
851
852        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
853        for mut column in table_schema.column_schemas().iter().cloned() {
854            if let Some(change_column) = modify_column_types.get(&column.name) {
855                column.data_type = change_column.target_type.clone();
856                let new_default = if let Some(default_value) = column.default_constraint() {
857                    Some(
858                        default_value
859                            .cast_to_datatype(&change_column.target_type)
860                            .with_context(|_| error::CastDefaultValueSnafu {
861                                reason: format!(
862                                    "Failed to cast default value from {:?} to type {:?}",
863                                    default_value, &change_column.target_type
864                                ),
865                            })?,
866                    )
867                } else {
868                    None
869                };
870                column = column
871                    .clone()
872                    .with_default_constraint(new_default.clone())
873                    .with_context(|_| error::CastDefaultValueSnafu {
874                        reason: format!("Failed to set new default: {:?}", new_default),
875                    })?;
876            }
877            columns.push(column)
878        }
879
880        let mut builder = SchemaBuilder::try_from_columns(columns)
881            .with_context(|_| error::SchemaBuildSnafu {
882                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
883            })?
884            // Also bump the schema version.
885            .version(table_schema.version() + 1);
886        for (k, v) in table_schema.metadata().iter() {
887            builder = builder.add_metadata(k, v);
888        }
889        let new_schema = builder.build().with_context(|_| {
890            let column_names: Vec<_> = requests
891                .iter()
892                .map(|request| &request.column_name)
893                .collect();
894
895            error::SchemaBuildSnafu {
896                msg: format!(
897                    "Table {table_name} cannot change datatype with columns {column_names:?}"
898                ),
899            }
900        })?;
901
902        let _ = meta_builder
903            .schema(Arc::new(new_schema))
904            .primary_key_indices(self.primary_key_indices.clone());
905
906        Ok(meta_builder)
907    }
908
909    /// Split requests into different groups using column location info.
910    fn split_requests_by_column_location<'a>(
911        &self,
912        table_name: &str,
913        requests: &'a [AddColumnRequest],
914    ) -> Result<SplitResult<'a>> {
915        let table_schema = &self.schema;
916        let mut columns_at_first = Vec::new();
917        let mut columns_at_after = HashMap::new();
918        let mut columns_at_last = Vec::new();
919        let mut column_names = Vec::with_capacity(requests.len());
920        for request in requests {
921            // Check whether columns to add are already existing.
922            let column_name = &request.column_schema.name;
923            column_names.push(column_name.clone());
924            ensure!(
925                table_schema.column_schema_by_name(column_name).is_none(),
926                error::ColumnExistsSnafu {
927                    column_name,
928                    table_name,
929                }
930            );
931            match request.location.as_ref() {
932                Some(AddColumnLocation::First) => {
933                    columns_at_first.push(request);
934                }
935                Some(AddColumnLocation::After { column_name }) => {
936                    ensure!(
937                        table_schema.column_schema_by_name(column_name).is_some(),
938                        error::ColumnNotExistsSnafu {
939                            column_name,
940                            table_name,
941                        }
942                    );
943                    columns_at_after
944                        .entry(column_name.clone())
945                        .or_insert(Vec::new())
946                        .push(request);
947                }
948                None => {
949                    columns_at_last.push(request);
950                }
951            }
952        }
953        Ok(SplitResult {
954            columns_at_first,
955            columns_at_after,
956            columns_at_last,
957            column_names,
958        })
959    }
960
961    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
962        let table_schema = &self.schema;
963        let mut meta_builder = self.new_meta_builder();
964        let mut columns = Vec::with_capacity(table_schema.num_columns());
965        for column_schema in table_schema.column_schemas() {
966            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
967                // Drop default constraint.
968                ensure!(
969                    column_schema.default_constraint().is_some(),
970                    error::InvalidAlterRequestSnafu {
971                        table: table_name,
972                        err: format!("column {name} does not have a default value"),
973                    }
974                );
975                if !column_schema.is_nullable() {
976                    return error::InvalidAlterRequestSnafu {
977                        table: table_name,
978                        err: format!(
979                            "column {name} is not nullable and `default` cannot be dropped",
980                        ),
981                    }
982                    .fail();
983                }
984                let new_column_schema = column_schema.clone();
985                let new_column_schema = new_column_schema
986                    .with_default_constraint(None)
987                    .with_context(|_| error::SchemaBuildSnafu {
988                        msg: format!("Table {table_name} cannot drop default values"),
989                    })?;
990                columns.push(new_column_schema);
991            } else {
992                columns.push(column_schema.clone());
993            }
994        }
995
996        let mut builder = SchemaBuilder::try_from_columns(columns)
997            .with_context(|_| error::SchemaBuildSnafu {
998                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
999            })?
1000            // Also bump the schema version.
1001            .version(table_schema.version() + 1);
1002        for (k, v) in table_schema.metadata().iter() {
1003            builder = builder.add_metadata(k, v);
1004        }
1005        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1006            msg: format!("Table {table_name} cannot drop default values"),
1007        })?;
1008
1009        let _ = meta_builder.schema(Arc::new(new_schema));
1010
1011        Ok(meta_builder)
1012    }
1013
1014    fn set_defaults(
1015        &self,
1016        table_name: &str,
1017        set_defaults: &[SetDefaultRequest],
1018    ) -> Result<TableMetaBuilder> {
1019        let table_schema = &self.schema;
1020        let mut meta_builder = self.new_meta_builder();
1021        let mut columns = Vec::with_capacity(table_schema.num_columns());
1022        for column_schema in table_schema.column_schemas() {
1023            if let Some(set_default) = set_defaults
1024                .iter()
1025                .find(|s| s.column_name == column_schema.name)
1026            {
1027                let new_column_schema = column_schema.clone();
1028                let new_column_schema = new_column_schema
1029                    .with_default_constraint(set_default.default_constraint.clone())
1030                    .with_context(|_| error::SchemaBuildSnafu {
1031                        msg: format!("Table {table_name} cannot set default values"),
1032                    })?;
1033                columns.push(new_column_schema);
1034            } else {
1035                columns.push(column_schema.clone());
1036            }
1037        }
1038
1039        let mut builder = SchemaBuilder::try_from_columns(columns)
1040            .with_context(|_| error::SchemaBuildSnafu {
1041                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1042            })?
1043            // Also bump the schema version.
1044            .version(table_schema.version() + 1);
1045        for (k, v) in table_schema.metadata().iter() {
1046            builder = builder.add_metadata(k, v);
1047        }
1048        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1049            msg: format!("Table {table_name} cannot set default values"),
1050        })?;
1051
1052        let _ = meta_builder.schema(Arc::new(new_schema));
1053
1054        Ok(meta_builder)
1055    }
1056}
1057
1058#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1059#[builder(pattern = "owned")]
1060pub struct TableInfo {
1061    /// Id and version of the table.
1062    #[builder(default, setter(into))]
1063    pub ident: TableIdent,
1064    /// Name of the table.
1065    #[builder(setter(into))]
1066    pub name: String,
1067    /// Comment of the table.
1068    #[builder(default, setter(into))]
1069    pub desc: Option<String>,
1070    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1071    pub catalog_name: String,
1072    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1073    pub schema_name: String,
1074    pub meta: TableMeta,
1075    #[builder(default = "TableType::Base")]
1076    pub table_type: TableType,
1077}
1078
1079pub type TableInfoRef = Arc<TableInfo>;
1080
1081impl TableInfo {
1082    pub fn table_id(&self) -> TableId {
1083        self.ident.table_id
1084    }
1085
1086    pub fn region_ids(&self) -> Vec<RegionId> {
1087        self.meta
1088            .region_numbers
1089            .iter()
1090            .map(|id| RegionId::new(self.table_id(), *id))
1091            .collect()
1092    }
1093    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1094    pub fn full_table_name(&self) -> String {
1095        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1096    }
1097
1098    pub fn get_db_string(&self) -> String {
1099        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1100    }
1101
1102    /// Returns true when the table is the metric engine's physical table.
1103    pub fn is_physical_table(&self) -> bool {
1104        self.meta
1105            .options
1106            .extra_options
1107            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1108    }
1109
1110    /// Return true if the table's TTL is `instant`.
1111    pub fn is_ttl_instant_table(&self) -> bool {
1112        self.meta
1113            .options
1114            .ttl
1115            .map(|t| t.is_instant())
1116            .unwrap_or(false)
1117    }
1118}
1119
1120impl TableInfoBuilder {
1121    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1122        Self {
1123            name: Some(name.into()),
1124            meta: Some(meta),
1125            ..Default::default()
1126        }
1127    }
1128
1129    pub fn table_id(mut self, id: TableId) -> Self {
1130        let ident = self.ident.get_or_insert_with(TableIdent::default);
1131        ident.table_id = id;
1132        self
1133    }
1134
1135    pub fn table_version(mut self, version: TableVersion) -> Self {
1136        let ident = self.ident.get_or_insert_with(TableIdent::default);
1137        ident.version = version;
1138        self
1139    }
1140}
1141
1142impl TableIdent {
1143    pub fn new(table_id: TableId) -> Self {
1144        Self {
1145            table_id,
1146            version: 0,
1147        }
1148    }
1149}
1150
1151impl From<TableId> for TableIdent {
1152    fn from(table_id: TableId) -> Self {
1153        Self::new(table_id)
1154    }
1155}
1156
1157/// Struct used to serialize and deserialize [`TableMeta`].
1158#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1159pub struct RawTableMeta {
1160    pub schema: RawSchema,
1161    /// The indices of columns in primary key. Note that the index of timestamp column
1162    /// is not included. Order matters to this array.
1163    pub primary_key_indices: Vec<usize>,
1164    ///  The indices of columns in value. The index of timestamp column is included.
1165    /// Order doesn't matter to this array.
1166    pub value_indices: Vec<usize>,
1167    /// Engine type of this table. Usually in small case.
1168    pub engine: String,
1169    /// Next column id of a new column.
1170    /// It's used to ensure all columns with the same name across all regions have the same column id.
1171    pub next_column_id: ColumnId,
1172    pub region_numbers: Vec<u32>,
1173    pub options: TableOptions,
1174    pub created_on: DateTime<Utc>,
1175    pub updated_on: DateTime<Utc>,
1176    /// Order doesn't matter to this array.
1177    #[serde(default)]
1178    pub partition_key_indices: Vec<usize>,
1179    /// Map of column name to column id.
1180    /// Note: This field may be empty for older versions that did not include this field.
1181    #[serde(default)]
1182    pub column_ids: Vec<ColumnId>,
1183}
1184
1185impl<'de> Deserialize<'de> for RawTableMeta {
1186    fn deserialize<D>(
1187        deserializer: D,
1188    ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1189    where
1190        D: Deserializer<'de>,
1191    {
1192        #[derive(Deserialize)]
1193        struct Helper {
1194            schema: RawSchema,
1195            primary_key_indices: Vec<usize>,
1196            value_indices: Vec<usize>,
1197            engine: String,
1198            next_column_id: u32,
1199            region_numbers: Vec<u32>,
1200            options: TableOptions,
1201            created_on: DateTime<Utc>,
1202            updated_on: Option<DateTime<Utc>>,
1203            #[serde(default)]
1204            partition_key_indices: Vec<usize>,
1205            #[serde(default)]
1206            column_ids: Vec<ColumnId>,
1207        }
1208
1209        let h = Helper::deserialize(deserializer)?;
1210        Ok(RawTableMeta {
1211            schema: h.schema,
1212            primary_key_indices: h.primary_key_indices,
1213            value_indices: h.value_indices,
1214            engine: h.engine,
1215            next_column_id: h.next_column_id,
1216            region_numbers: h.region_numbers,
1217            options: h.options,
1218            created_on: h.created_on,
1219            updated_on: h.updated_on.unwrap_or(h.created_on),
1220            partition_key_indices: h.partition_key_indices,
1221            column_ids: h.column_ids,
1222        })
1223    }
1224}
1225
1226impl From<TableMeta> for RawTableMeta {
1227    fn from(meta: TableMeta) -> RawTableMeta {
1228        RawTableMeta {
1229            schema: RawSchema::from(&*meta.schema),
1230            primary_key_indices: meta.primary_key_indices,
1231            value_indices: meta.value_indices,
1232            engine: meta.engine,
1233            next_column_id: meta.next_column_id,
1234            region_numbers: meta.region_numbers,
1235            options: meta.options,
1236            created_on: meta.created_on,
1237            updated_on: meta.updated_on,
1238            partition_key_indices: meta.partition_key_indices,
1239            column_ids: meta.column_ids,
1240        }
1241    }
1242}
1243
1244impl TryFrom<RawTableMeta> for TableMeta {
1245    type Error = ConvertError;
1246
1247    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1248        Ok(TableMeta {
1249            schema: Arc::new(Schema::try_from(raw.schema)?),
1250            primary_key_indices: raw.primary_key_indices,
1251            value_indices: raw.value_indices,
1252            engine: raw.engine,
1253            region_numbers: raw.region_numbers,
1254            next_column_id: raw.next_column_id,
1255            options: raw.options,
1256            created_on: raw.created_on,
1257            updated_on: raw.updated_on,
1258            partition_key_indices: raw.partition_key_indices,
1259            column_ids: raw.column_ids,
1260        })
1261    }
1262}
1263
1264/// Struct used to serialize and deserialize [`TableInfo`].
1265#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1266pub struct RawTableInfo {
1267    pub ident: TableIdent,
1268    pub name: String,
1269    pub desc: Option<String>,
1270    pub catalog_name: String,
1271    pub schema_name: String,
1272    pub meta: RawTableMeta,
1273    pub table_type: TableType,
1274}
1275
1276impl RawTableInfo {
1277    /// Returns the map of column name to column id.
1278    ///
1279    /// Note: This method may return an empty map for older versions that did not include this field.
1280    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1281        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1282            None
1283        } else {
1284            Some(
1285                self.meta
1286                    .column_ids
1287                    .iter()
1288                    .enumerate()
1289                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1290                    .collect(),
1291            )
1292        }
1293    }
1294
1295    /// Sort the columns in [RawTableInfo], logical tables require it.
1296    pub fn sort_columns(&mut self) {
1297        let column_schemas = &self.meta.schema.column_schemas;
1298        let primary_keys = self
1299            .meta
1300            .primary_key_indices
1301            .iter()
1302            .map(|index| column_schemas[*index].name.clone())
1303            .collect::<HashSet<_>>();
1304
1305        let name_to_ids = self.name_to_ids().unwrap_or_default();
1306        self.meta
1307            .schema
1308            .column_schemas
1309            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1310
1311        // Compute new indices of sorted columns
1312        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1313        let mut timestamp_index = None;
1314        let mut value_indices =
1315            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1316        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1317        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1318            if primary_keys.contains(&column_schema.name) {
1319                primary_key_indices.push(index);
1320            } else if column_schema.is_time_index() {
1321                value_indices.push(index);
1322                timestamp_index = Some(index);
1323            } else {
1324                value_indices.push(index);
1325            }
1326            if let Some(id) = name_to_ids.get(&column_schema.name) {
1327                column_ids.push(*id);
1328            }
1329        }
1330
1331        // Overwrite table meta
1332        self.meta.schema.timestamp_index = timestamp_index;
1333        self.meta.primary_key_indices = primary_key_indices;
1334        self.meta.value_indices = value_indices;
1335        self.meta.column_ids = column_ids;
1336    }
1337
1338    /// Extracts region options from table info.
1339    ///
1340    /// All "region options" are actually a copy of table options for redundancy.
1341    pub fn to_region_options(&self) -> HashMap<String, String> {
1342        HashMap::from(&self.meta.options)
1343    }
1344
1345    /// Returns the table reference.
1346    pub fn table_ref(&self) -> TableReference<'_> {
1347        TableReference::full(
1348            self.catalog_name.as_str(),
1349            self.schema_name.as_str(),
1350            self.name.as_str(),
1351        )
1352    }
1353}
1354
1355impl From<TableInfo> for RawTableInfo {
1356    fn from(info: TableInfo) -> RawTableInfo {
1357        RawTableInfo {
1358            ident: info.ident,
1359            name: info.name,
1360            desc: info.desc,
1361            catalog_name: info.catalog_name,
1362            schema_name: info.schema_name,
1363            meta: RawTableMeta::from(info.meta),
1364            table_type: info.table_type,
1365        }
1366    }
1367}
1368
1369impl TryFrom<RawTableInfo> for TableInfo {
1370    type Error = ConvertError;
1371
1372    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1373        Ok(TableInfo {
1374            ident: raw.ident,
1375            name: raw.name,
1376            desc: raw.desc,
1377            catalog_name: raw.catalog_name,
1378            schema_name: raw.schema_name,
1379            meta: TableMeta::try_from(raw.meta)?,
1380            table_type: raw.table_type,
1381        })
1382    }
1383}
1384
1385/// Set column fulltext options if it passed the validation.
1386///
1387/// Options allowed to modify:
1388/// * backend
1389///
1390/// Options not allowed to modify:
1391/// * analyzer
1392/// * case_sensitive
1393fn set_column_fulltext_options(
1394    column_schema: &mut ColumnSchema,
1395    column_name: &str,
1396    options: &FulltextOptions,
1397    current_options: Option<FulltextOptions>,
1398) -> Result<()> {
1399    if let Some(current_options) = current_options {
1400        ensure!(
1401            current_options.analyzer == options.analyzer
1402                && current_options.case_sensitive == options.case_sensitive,
1403            error::InvalidColumnOptionSnafu {
1404                column_name,
1405                msg: format!(
1406                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1407                    current_options.analyzer, current_options.case_sensitive
1408                ),
1409            }
1410        );
1411    }
1412
1413    column_schema
1414        .set_fulltext_options(options)
1415        .context(error::SetFulltextOptionsSnafu { column_name })?;
1416
1417    Ok(())
1418}
1419
1420fn unset_column_fulltext_options(
1421    column_schema: &mut ColumnSchema,
1422    column_name: &str,
1423    current_options: Option<FulltextOptions>,
1424) -> Result<()> {
1425    ensure!(
1426        current_options
1427            .as_ref()
1428            .is_some_and(|options| options.enable),
1429        error::InvalidColumnOptionSnafu {
1430            column_name,
1431            msg: "FULLTEXT index already disabled".to_string(),
1432        }
1433    );
1434
1435    let mut options = current_options.unwrap();
1436    options.enable = false;
1437    column_schema
1438        .set_fulltext_options(&options)
1439        .context(error::SetFulltextOptionsSnafu { column_name })?;
1440
1441    Ok(())
1442}
1443
1444fn set_column_skipping_index_options(
1445    column_schema: &mut ColumnSchema,
1446    column_name: &str,
1447    options: &SkippingIndexOptions,
1448) -> Result<()> {
1449    column_schema
1450        .set_skipping_options(options)
1451        .context(error::SetSkippingOptionsSnafu { column_name })?;
1452
1453    Ok(())
1454}
1455
1456fn unset_column_skipping_index_options(
1457    column_schema: &mut ColumnSchema,
1458    column_name: &str,
1459) -> Result<()> {
1460    column_schema
1461        .unset_skipping_options()
1462        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1463    Ok(())
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468    use std::assert_matches::assert_matches;
1469
1470    use common_error::ext::ErrorExt;
1471    use common_error::status_code::StatusCode;
1472    use datatypes::data_type::ConcreteDataType;
1473    use datatypes::schema::{
1474        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1475    };
1476
1477    use super::*;
1478    use crate::Error;
1479
1480    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1481    fn new_test_schema() -> Schema {
1482        let column_schemas = vec![
1483            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1484            ColumnSchema::new(
1485                "ts",
1486                ConcreteDataType::timestamp_millisecond_datatype(),
1487                false,
1488            )
1489            .with_time_index(true),
1490            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1491        ];
1492        SchemaBuilder::try_from(column_schemas)
1493            .unwrap()
1494            .version(123)
1495            .build()
1496            .unwrap()
1497    }
1498
1499    #[test]
1500    fn test_raw_convert() {
1501        let schema = Arc::new(new_test_schema());
1502        let meta = TableMetaBuilder::empty()
1503            .schema(schema)
1504            .primary_key_indices(vec![0])
1505            .engine("engine")
1506            .next_column_id(3)
1507            .build()
1508            .unwrap();
1509        let info = TableInfoBuilder::default()
1510            .table_id(10)
1511            .table_version(5)
1512            .name("mytable")
1513            .meta(meta)
1514            .build()
1515            .unwrap();
1516
1517        let raw = RawTableInfo::from(info.clone());
1518        let info_new = TableInfo::try_from(raw).unwrap();
1519
1520        assert_eq!(info, info_new);
1521    }
1522
1523    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1524        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1525        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1526        let alter_kind = AlterKind::AddColumns {
1527            columns: vec![
1528                AddColumnRequest {
1529                    column_schema: new_tag,
1530                    is_key: true,
1531                    location: None,
1532                    add_if_not_exists: false,
1533                },
1534                AddColumnRequest {
1535                    column_schema: new_field,
1536                    is_key: false,
1537                    location: None,
1538                    add_if_not_exists: false,
1539                },
1540            ],
1541        };
1542
1543        let builder = meta
1544            .builder_with_alter_kind("my_table", &alter_kind)
1545            .unwrap();
1546        builder.build().unwrap()
1547    }
1548
1549    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1550        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1551        let new_field = ColumnSchema::new(
1552            "my_field_after_ts",
1553            ConcreteDataType::string_datatype(),
1554            true,
1555        );
1556        let yet_another_field = ColumnSchema::new(
1557            "yet_another_field_after_ts",
1558            ConcreteDataType::int64_datatype(),
1559            true,
1560        );
1561        let alter_kind = AlterKind::AddColumns {
1562            columns: vec![
1563                AddColumnRequest {
1564                    column_schema: new_tag,
1565                    is_key: true,
1566                    location: Some(AddColumnLocation::First),
1567                    add_if_not_exists: false,
1568                },
1569                AddColumnRequest {
1570                    column_schema: new_field,
1571                    is_key: false,
1572                    location: Some(AddColumnLocation::After {
1573                        column_name: "ts".to_string(),
1574                    }),
1575                    add_if_not_exists: false,
1576                },
1577                AddColumnRequest {
1578                    column_schema: yet_another_field,
1579                    is_key: true,
1580                    location: Some(AddColumnLocation::After {
1581                        column_name: "ts".to_string(),
1582                    }),
1583                    add_if_not_exists: false,
1584                },
1585            ],
1586        };
1587
1588        let builder = meta
1589            .builder_with_alter_kind("my_table", &alter_kind)
1590            .unwrap();
1591        builder.build().unwrap()
1592    }
1593
1594    #[test]
1595    fn test_add_columns() {
1596        let schema = Arc::new(new_test_schema());
1597        let meta = TableMetaBuilder::empty()
1598            .schema(schema)
1599            .primary_key_indices(vec![0])
1600            .engine("engine")
1601            .next_column_id(3)
1602            .build()
1603            .unwrap();
1604
1605        let new_meta = add_columns_to_meta(&meta);
1606        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1607
1608        let names: Vec<String> = new_meta
1609            .schema
1610            .column_schemas()
1611            .iter()
1612            .map(|column_schema| column_schema.name.clone())
1613            .collect();
1614        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1615        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1616        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1617    }
1618
1619    #[test]
1620    fn test_add_columns_multiple_times() {
1621        let schema = Arc::new(new_test_schema());
1622        let meta = TableMetaBuilder::empty()
1623            .schema(schema)
1624            .primary_key_indices(vec![0])
1625            .engine("engine")
1626            .next_column_id(3)
1627            .build()
1628            .unwrap();
1629
1630        let alter_kind = AlterKind::AddColumns {
1631            columns: vec![
1632                AddColumnRequest {
1633                    column_schema: ColumnSchema::new(
1634                        "col3",
1635                        ConcreteDataType::int32_datatype(),
1636                        true,
1637                    ),
1638                    is_key: true,
1639                    location: None,
1640                    add_if_not_exists: true,
1641                },
1642                AddColumnRequest {
1643                    column_schema: ColumnSchema::new(
1644                        "col3",
1645                        ConcreteDataType::int32_datatype(),
1646                        true,
1647                    ),
1648                    is_key: true,
1649                    location: None,
1650                    add_if_not_exists: true,
1651                },
1652            ],
1653        };
1654        let err = meta
1655            .builder_with_alter_kind("my_table", &alter_kind)
1656            .err()
1657            .unwrap();
1658        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1659    }
1660
1661    #[test]
1662    fn test_remove_columns() {
1663        let schema = Arc::new(new_test_schema());
1664        let meta = TableMetaBuilder::empty()
1665            .schema(schema.clone())
1666            .primary_key_indices(vec![0])
1667            .engine("engine")
1668            .next_column_id(3)
1669            .build()
1670            .unwrap();
1671        // Add more columns so we have enough candidate columns to remove.
1672        let meta = add_columns_to_meta(&meta);
1673
1674        let alter_kind = AlterKind::DropColumns {
1675            names: vec![String::from("col2"), String::from("my_field")],
1676        };
1677        let new_meta = meta
1678            .builder_with_alter_kind("my_table", &alter_kind)
1679            .unwrap()
1680            .build()
1681            .unwrap();
1682
1683        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1684
1685        let names: Vec<String> = new_meta
1686            .schema
1687            .column_schemas()
1688            .iter()
1689            .map(|column_schema| column_schema.name.clone())
1690            .collect();
1691        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1692        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1693        assert_eq!(&[1], &new_meta.value_indices[..]);
1694        assert_eq!(
1695            schema.timestamp_column(),
1696            new_meta.schema.timestamp_column()
1697        );
1698    }
1699
1700    #[test]
1701    fn test_remove_multiple_columns_before_timestamp() {
1702        let column_schemas = vec![
1703            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1704            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1705            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1706            ColumnSchema::new(
1707                "ts",
1708                ConcreteDataType::timestamp_millisecond_datatype(),
1709                false,
1710            )
1711            .with_time_index(true),
1712        ];
1713        let schema = Arc::new(
1714            SchemaBuilder::try_from(column_schemas)
1715                .unwrap()
1716                .version(123)
1717                .build()
1718                .unwrap(),
1719        );
1720        let meta = TableMetaBuilder::empty()
1721            .schema(schema.clone())
1722            .primary_key_indices(vec![1])
1723            .engine("engine")
1724            .next_column_id(4)
1725            .build()
1726            .unwrap();
1727
1728        // Remove columns in reverse order to test whether timestamp index is valid.
1729        let alter_kind = AlterKind::DropColumns {
1730            names: vec![String::from("col3"), String::from("col1")],
1731        };
1732        let new_meta = meta
1733            .builder_with_alter_kind("my_table", &alter_kind)
1734            .unwrap()
1735            .build()
1736            .unwrap();
1737
1738        let names: Vec<String> = new_meta
1739            .schema
1740            .column_schemas()
1741            .iter()
1742            .map(|column_schema| column_schema.name.clone())
1743            .collect();
1744        assert_eq!(&["col2", "ts"], &names[..]);
1745        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1746        assert_eq!(&[1], &new_meta.value_indices[..]);
1747        assert_eq!(
1748            schema.timestamp_column(),
1749            new_meta.schema.timestamp_column()
1750        );
1751    }
1752
1753    #[test]
1754    fn test_add_existing_column() {
1755        let schema = Arc::new(new_test_schema());
1756        let meta = TableMetaBuilder::empty()
1757            .schema(schema)
1758            .primary_key_indices(vec![0])
1759            .engine("engine")
1760            .next_column_id(3)
1761            .build()
1762            .unwrap();
1763
1764        let alter_kind = AlterKind::AddColumns {
1765            columns: vec![AddColumnRequest {
1766                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1767                is_key: false,
1768                location: None,
1769                add_if_not_exists: false,
1770            }],
1771        };
1772
1773        let err = meta
1774            .builder_with_alter_kind("my_table", &alter_kind)
1775            .err()
1776            .unwrap();
1777        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1778
1779        // Add if not exists
1780        let alter_kind = AlterKind::AddColumns {
1781            columns: vec![AddColumnRequest {
1782                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1783                is_key: true,
1784                location: None,
1785                add_if_not_exists: true,
1786            }],
1787        };
1788        let new_meta = meta
1789            .builder_with_alter_kind("my_table", &alter_kind)
1790            .unwrap()
1791            .build()
1792            .unwrap();
1793        assert_eq!(
1794            meta.schema.column_schemas(),
1795            new_meta.schema.column_schemas()
1796        );
1797        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1798    }
1799
1800    #[test]
1801    fn test_add_different_type_column() {
1802        let schema = Arc::new(new_test_schema());
1803        let meta = TableMetaBuilder::empty()
1804            .schema(schema)
1805            .primary_key_indices(vec![0])
1806            .engine("engine")
1807            .next_column_id(3)
1808            .build()
1809            .unwrap();
1810
1811        // Add if not exists, but different type.
1812        let alter_kind = AlterKind::AddColumns {
1813            columns: vec![AddColumnRequest {
1814                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1815                is_key: false,
1816                location: None,
1817                add_if_not_exists: true,
1818            }],
1819        };
1820        let err = meta
1821            .builder_with_alter_kind("my_table", &alter_kind)
1822            .err()
1823            .unwrap();
1824        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1825    }
1826
1827    #[test]
1828    fn test_add_invalid_column() {
1829        let schema = Arc::new(new_test_schema());
1830        let meta = TableMetaBuilder::empty()
1831            .schema(schema)
1832            .primary_key_indices(vec![0])
1833            .engine("engine")
1834            .next_column_id(3)
1835            .build()
1836            .unwrap();
1837
1838        // Not nullable and no default value.
1839        let alter_kind = AlterKind::AddColumns {
1840            columns: vec![AddColumnRequest {
1841                column_schema: ColumnSchema::new(
1842                    "weny",
1843                    ConcreteDataType::string_datatype(),
1844                    false,
1845                ),
1846                is_key: false,
1847                location: None,
1848                add_if_not_exists: false,
1849            }],
1850        };
1851
1852        let err = meta
1853            .builder_with_alter_kind("my_table", &alter_kind)
1854            .err()
1855            .unwrap();
1856        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1857    }
1858
1859    #[test]
1860    fn test_remove_unknown_column() {
1861        let schema = Arc::new(new_test_schema());
1862        let meta = TableMetaBuilder::empty()
1863            .schema(schema)
1864            .primary_key_indices(vec![0])
1865            .engine("engine")
1866            .next_column_id(3)
1867            .build()
1868            .unwrap();
1869
1870        let alter_kind = AlterKind::DropColumns {
1871            names: vec![String::from("unknown")],
1872        };
1873
1874        let err = meta
1875            .builder_with_alter_kind("my_table", &alter_kind)
1876            .err()
1877            .unwrap();
1878        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1879    }
1880
1881    #[test]
1882    fn test_change_unknown_column_data_type() {
1883        let schema = Arc::new(new_test_schema());
1884        let meta = TableMetaBuilder::empty()
1885            .schema(schema)
1886            .primary_key_indices(vec![0])
1887            .engine("engine")
1888            .next_column_id(3)
1889            .build()
1890            .unwrap();
1891
1892        let alter_kind = AlterKind::ModifyColumnTypes {
1893            columns: vec![ModifyColumnTypeRequest {
1894                column_name: "unknown".to_string(),
1895                target_type: ConcreteDataType::string_datatype(),
1896            }],
1897        };
1898
1899        let err = meta
1900            .builder_with_alter_kind("my_table", &alter_kind)
1901            .err()
1902            .unwrap();
1903        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1904    }
1905
1906    #[test]
1907    fn test_remove_key_column() {
1908        let schema = Arc::new(new_test_schema());
1909        let meta = TableMetaBuilder::empty()
1910            .schema(schema)
1911            .primary_key_indices(vec![0])
1912            .engine("engine")
1913            .next_column_id(3)
1914            .build()
1915            .unwrap();
1916
1917        // Remove column in primary key.
1918        let alter_kind = AlterKind::DropColumns {
1919            names: vec![String::from("col1")],
1920        };
1921
1922        let err = meta
1923            .builder_with_alter_kind("my_table", &alter_kind)
1924            .err()
1925            .unwrap();
1926        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1927
1928        // Remove timestamp column.
1929        let alter_kind = AlterKind::DropColumns {
1930            names: vec![String::from("ts")],
1931        };
1932
1933        let err = meta
1934            .builder_with_alter_kind("my_table", &alter_kind)
1935            .err()
1936            .unwrap();
1937        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1938    }
1939
1940    #[test]
1941    fn test_remove_partition_column() {
1942        let schema = Arc::new(new_test_schema());
1943        let meta = TableMetaBuilder::empty()
1944            .schema(schema)
1945            .primary_key_indices(vec![])
1946            .partition_key_indices(vec![0])
1947            .engine("engine")
1948            .next_column_id(3)
1949            .build()
1950            .unwrap();
1951        // Remove column in primary key.
1952        let alter_kind = AlterKind::DropColumns {
1953            names: vec![String::from("col1")],
1954        };
1955
1956        let err = meta
1957            .builder_with_alter_kind("my_table", &alter_kind)
1958            .err()
1959            .unwrap();
1960        assert_matches!(err, Error::RemovePartitionColumn { .. });
1961    }
1962
1963    #[test]
1964    fn test_change_key_column_data_type() {
1965        let schema = Arc::new(new_test_schema());
1966        let meta = TableMetaBuilder::empty()
1967            .schema(schema)
1968            .primary_key_indices(vec![0])
1969            .engine("engine")
1970            .next_column_id(3)
1971            .build()
1972            .unwrap();
1973
1974        // Remove column in primary key.
1975        let alter_kind = AlterKind::ModifyColumnTypes {
1976            columns: vec![ModifyColumnTypeRequest {
1977                column_name: "col1".to_string(),
1978                target_type: ConcreteDataType::string_datatype(),
1979            }],
1980        };
1981
1982        let err = meta
1983            .builder_with_alter_kind("my_table", &alter_kind)
1984            .err()
1985            .unwrap();
1986        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1987
1988        // Remove timestamp column.
1989        let alter_kind = AlterKind::ModifyColumnTypes {
1990            columns: vec![ModifyColumnTypeRequest {
1991                column_name: "ts".to_string(),
1992                target_type: ConcreteDataType::string_datatype(),
1993            }],
1994        };
1995
1996        let err = meta
1997            .builder_with_alter_kind("my_table", &alter_kind)
1998            .err()
1999            .unwrap();
2000        assert_eq!(StatusCode::InvalidArguments, err.status_code());
2001    }
2002
2003    #[test]
2004    fn test_alloc_new_column() {
2005        let schema = Arc::new(new_test_schema());
2006        let mut meta = TableMetaBuilder::empty()
2007            .schema(schema)
2008            .primary_key_indices(vec![0])
2009            .engine("engine")
2010            .next_column_id(3)
2011            .build()
2012            .unwrap();
2013        assert_eq!(3, meta.next_column_id);
2014
2015        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2016        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2017
2018        assert_eq!(4, meta.next_column_id);
2019        assert_eq!(column_schema.name, desc.name);
2020    }
2021
2022    #[test]
2023    fn test_add_columns_with_location() {
2024        let schema = Arc::new(new_test_schema());
2025        let meta = TableMetaBuilder::empty()
2026            .schema(schema)
2027            .primary_key_indices(vec![0])
2028            // partition col: col1, col2
2029            .partition_key_indices(vec![0, 2])
2030            .engine("engine")
2031            .next_column_id(3)
2032            .build()
2033            .unwrap();
2034
2035        let new_meta = add_columns_to_meta_with_location(&meta);
2036        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2037
2038        let names: Vec<String> = new_meta
2039            .schema
2040            .column_schemas()
2041            .iter()
2042            .map(|column_schema| column_schema.name.clone())
2043            .collect();
2044        assert_eq!(
2045            &[
2046                "my_tag_first",               // primary key column
2047                "col1",                       // partition column
2048                "ts",                         // timestamp column
2049                "yet_another_field_after_ts", // primary key column
2050                "my_field_after_ts",          // value column
2051                "col2",                       // partition column
2052            ],
2053            &names[..]
2054        );
2055        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2056        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2057        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2058    }
2059
2060    #[test]
2061    fn test_modify_column_fulltext_options() {
2062        let schema = Arc::new(new_test_schema());
2063        let meta = TableMetaBuilder::empty()
2064            .schema(schema)
2065            .primary_key_indices(vec![0])
2066            .engine("engine")
2067            .next_column_id(3)
2068            .build()
2069            .unwrap();
2070
2071        let alter_kind = AlterKind::SetIndexes {
2072            options: vec![SetIndexOption::Fulltext {
2073                column_name: "col1".to_string(),
2074                options: FulltextOptions::default(),
2075            }],
2076        };
2077        let err = meta
2078            .builder_with_alter_kind("my_table", &alter_kind)
2079            .err()
2080            .unwrap();
2081        assert_eq!(
2082            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2083            err.to_string()
2084        );
2085
2086        // Add a string column and make it fulltext indexed
2087        let new_meta = add_columns_to_meta_with_location(&meta);
2088        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2089
2090        let alter_kind = AlterKind::SetIndexes {
2091            options: vec![SetIndexOption::Fulltext {
2092                column_name: "my_tag_first".to_string(),
2093                options: FulltextOptions::new_unchecked(
2094                    true,
2095                    FulltextAnalyzer::Chinese,
2096                    true,
2097                    FulltextBackend::Bloom,
2098                    1000,
2099                    0.01,
2100                ),
2101            }],
2102        };
2103        let new_meta = new_meta
2104            .builder_with_alter_kind("my_table", &alter_kind)
2105            .unwrap()
2106            .build()
2107            .unwrap();
2108        let column_schema = new_meta
2109            .schema
2110            .column_schema_by_name("my_tag_first")
2111            .unwrap();
2112        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2113        assert!(fulltext_options.enable);
2114        assert_eq!(
2115            datatypes::schema::FulltextAnalyzer::Chinese,
2116            fulltext_options.analyzer
2117        );
2118        assert!(fulltext_options.case_sensitive);
2119
2120        let alter_kind = AlterKind::UnsetIndexes {
2121            options: vec![UnsetIndexOption::Fulltext {
2122                column_name: "my_tag_first".to_string(),
2123            }],
2124        };
2125        let new_meta = new_meta
2126            .builder_with_alter_kind("my_table", &alter_kind)
2127            .unwrap()
2128            .build()
2129            .unwrap();
2130        let column_schema = new_meta
2131            .schema
2132            .column_schema_by_name("my_tag_first")
2133            .unwrap();
2134        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2135        assert!(!fulltext_options.enable);
2136    }
2137}