table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105    fn from(t: TableType) -> datafusion::datasource::TableType {
106        match t {
107            TableType::Base => datafusion::datasource::TableType::Base,
108            TableType::View => datafusion::datasource::TableType::View,
109            TableType::Temporary => datafusion::datasource::TableType::Temporary,
110        }
111    }
112}
113
114/// Identifier of the table.
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117    /// Unique id of this table.
118    pub table_id: TableId,
119    /// Version of the table, bumped when metadata (such as schema) of the table
120    /// being changed.
121    pub version: TableVersion,
122}
123
124/// The table metadata.
125///
126/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
127#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130    pub schema: SchemaRef,
131    /// The indices of columns in primary key. Note that the index of timestamp column
132    /// is not included in these indices.
133    pub primary_key_indices: Vec<usize>,
134    #[builder(default = "self.default_value_indices()?")]
135    pub value_indices: Vec<usize>,
136    #[builder(default, setter(into))]
137    pub engine: String,
138    pub next_column_id: ColumnId,
139    /// Table options.
140    #[builder(default)]
141    pub options: TableOptions,
142    #[builder(default = "Utc::now()")]
143    pub created_on: DateTime<Utc>,
144    #[builder(default = "self.default_updated_on()")]
145    pub updated_on: DateTime<Utc>,
146    #[builder(default = "Vec::new()")]
147    pub partition_key_indices: Vec<usize>,
148    #[builder(default = "Vec::new()")]
149    pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
154    #[cfg(any(test, feature = "testing"))]
155    pub fn empty() -> Self {
156        Self {
157            schema: None,
158            primary_key_indices: None,
159            value_indices: None,
160            engine: None,
161            next_column_id: None,
162            options: None,
163            created_on: None,
164            updated_on: None,
165            partition_key_indices: None,
166            column_ids: None,
167        }
168    }
169}
170
171impl TableMetaBuilder {
172    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173        match (&self.primary_key_indices, &self.schema) {
174            (Some(v), Some(schema)) => {
175                let column_schemas = schema.column_schemas();
176                Ok((0..column_schemas.len())
177                    .filter(|idx| !v.contains(idx))
178                    .collect())
179            }
180            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181        }
182    }
183
184    fn default_updated_on(&self) -> DateTime<Utc> {
185        self.created_on.unwrap_or_default()
186    }
187
188    pub fn new_external_table() -> Self {
189        Self {
190            schema: None,
191            primary_key_indices: Some(Vec::new()),
192            value_indices: Some(Vec::new()),
193            engine: None,
194            next_column_id: Some(0),
195            options: None,
196            created_on: None,
197            updated_on: None,
198            partition_key_indices: None,
199            column_ids: None,
200        }
201    }
202}
203
204/// The result after splitting requests by column location info.
205struct SplitResult<'a> {
206    /// column requests should be added at first place.
207    columns_at_first: Vec<&'a AddColumnRequest>,
208    /// column requests should be added after already exist columns.
209    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
210    /// column requests should be added at last place.
211    columns_at_last: Vec<&'a AddColumnRequest>,
212    /// all column names should be added.
213    column_names: Vec<String>,
214}
215
216impl TableMeta {
217    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
218        let columns_schemas = &self.schema.column_schemas();
219        self.primary_key_indices
220            .iter()
221            .map(|idx| &columns_schemas[*idx].name)
222    }
223
224    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
225        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
226        let columns_schemas = self.schema.column_schemas();
227        let primary_key_indices = &self.primary_key_indices;
228        columns_schemas
229            .iter()
230            .enumerate()
231            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
232            .map(|(_, cs)| &cs.name)
233    }
234
235    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
236        let columns_schemas = &self.schema.column_schemas();
237        self.partition_key_indices
238            .iter()
239            .map(|idx| &columns_schemas[*idx].name)
240    }
241
242    pub fn partition_columns(&self) -> impl Iterator<Item = &ColumnSchema> {
243        self.partition_key_indices
244            .iter()
245            .map(|idx| &self.schema.column_schemas()[*idx])
246    }
247
248    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
249    ///
250    /// The returned builder would derive the next column id of this meta.
251    pub fn builder_with_alter_kind(
252        &self,
253        table_name: &str,
254        alter_kind: &AlterKind,
255    ) -> Result<TableMetaBuilder> {
256        let mut builder = match alter_kind {
257            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
258            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
259            AlterKind::ModifyColumnTypes { columns } => {
260                self.modify_column_types(table_name, columns)
261            }
262            // No need to rebuild table meta when renaming tables.
263            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
264            AlterKind::SetTableOptions { options } => self.set_table_options(options),
265            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
266            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
267            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
268            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
269            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
270        }?;
271        let _ = builder.updated_on(Utc::now());
272        Ok(builder)
273    }
274
275    /// Creates a [TableMetaBuilder] with modified table options.
276    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
277        let mut new_options = self.options.clone();
278
279        for request in requests {
280            match request {
281                SetRegionOption::Ttl(new_ttl) => {
282                    new_options.ttl = *new_ttl;
283                }
284                SetRegionOption::Twsc(key, value) => {
285                    if !value.is_empty() {
286                        new_options.extra_options.insert(key.clone(), value.clone());
287                        // Ensure node restart correctly.
288                        new_options.extra_options.insert(
289                            COMPACTION_TYPE.to_string(),
290                            COMPACTION_TYPE_TWCS.to_string(),
291                        );
292                    } else {
293                        // Invalidate the previous change option if an empty value has been set.
294                        new_options.extra_options.remove(key.as_str());
295                    }
296                }
297                SetRegionOption::Format(value) => {
298                    new_options
299                        .extra_options
300                        .insert(SST_FORMAT_KEY.to_string(), value.clone());
301                }
302            }
303        }
304        let mut builder = self.new_meta_builder();
305        builder.options(new_options);
306
307        Ok(builder)
308    }
309
310    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
311        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
312        self.set_table_options(&requests)
313    }
314
315    fn set_indexes(
316        &self,
317        table_name: &str,
318        requests: &[SetIndexOption],
319    ) -> Result<TableMetaBuilder> {
320        let table_schema = &self.schema;
321        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
322        for request in requests {
323            let column_name = request.column_name();
324            table_schema
325                .column_index_by_name(column_name)
326                .with_context(|| error::ColumnNotExistsSnafu {
327                    column_name,
328                    table_name,
329                })?;
330            set_index_options
331                .entry(column_name)
332                .or_default()
333                .push(request);
334        }
335
336        let mut meta_builder = self.new_meta_builder();
337        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
338        for mut column in table_schema.column_schemas().iter().cloned() {
339            if let Some(request) = set_index_options.get(column.name.as_str()) {
340                for request in request {
341                    self.set_index(&mut column, request)?;
342                }
343            }
344            columns.push(column);
345        }
346
347        let mut builder = SchemaBuilder::try_from_columns(columns)
348            .with_context(|_| error::SchemaBuildSnafu {
349                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
350            })?
351            .version(table_schema.version() + 1);
352
353        for (k, v) in table_schema.metadata().iter() {
354            builder = builder.add_metadata(k, v);
355        }
356
357        let new_schema = builder.build().with_context(|_| {
358            let column_names = requests
359                .iter()
360                .map(|request| request.column_name())
361                .collect::<Vec<_>>();
362            error::SchemaBuildSnafu {
363                msg: format!(
364                    "Table {table_name} cannot set index options with columns {column_names:?}",
365                ),
366            }
367        })?;
368        let _ = meta_builder
369            .schema(Arc::new(new_schema))
370            .primary_key_indices(self.primary_key_indices.clone());
371
372        Ok(meta_builder)
373    }
374
375    fn unset_indexes(
376        &self,
377        table_name: &str,
378        requests: &[UnsetIndexOption],
379    ) -> Result<TableMetaBuilder> {
380        let table_schema = &self.schema;
381        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
382        for request in requests {
383            let column_name = request.column_name();
384            table_schema
385                .column_index_by_name(column_name)
386                .with_context(|| error::ColumnNotExistsSnafu {
387                    column_name,
388                    table_name,
389                })?;
390            set_index_options
391                .entry(column_name)
392                .or_default()
393                .push(request);
394        }
395
396        let mut meta_builder = self.new_meta_builder();
397        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
398        for mut column in table_schema.column_schemas().iter().cloned() {
399            if let Some(request) = set_index_options.get(column.name.as_str()) {
400                for request in request {
401                    self.unset_index(&mut column, request)?;
402                }
403            }
404            columns.push(column);
405        }
406
407        let mut builder = SchemaBuilder::try_from_columns(columns)
408            .with_context(|_| error::SchemaBuildSnafu {
409                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
410            })?
411            .version(table_schema.version() + 1);
412
413        for (k, v) in table_schema.metadata().iter() {
414            builder = builder.add_metadata(k, v);
415        }
416
417        let new_schema = builder.build().with_context(|_| {
418            let column_names = requests
419                .iter()
420                .map(|request| request.column_name())
421                .collect::<Vec<_>>();
422            error::SchemaBuildSnafu {
423                msg: format!(
424                    "Table {table_name} cannot set index options with columns {column_names:?}",
425                ),
426            }
427        })?;
428        let _ = meta_builder
429            .schema(Arc::new(new_schema))
430            .primary_key_indices(self.primary_key_indices.clone());
431
432        Ok(meta_builder)
433    }
434
435    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
436        match request {
437            SetIndexOption::Fulltext {
438                column_name,
439                options,
440            } => {
441                ensure!(
442                    column_schema.data_type.is_string(),
443                    error::InvalidColumnOptionSnafu {
444                        column_name,
445                        msg: "FULLTEXT index only supports string type",
446                    }
447                );
448
449                let current_fulltext_options = column_schema
450                    .fulltext_options()
451                    .context(error::SetFulltextOptionsSnafu { column_name })?;
452                set_column_fulltext_options(
453                    column_schema,
454                    column_name,
455                    options,
456                    current_fulltext_options,
457                )?;
458            }
459            SetIndexOption::Inverted { column_name } => {
460                debug_assert_eq!(column_schema.name, *column_name);
461                column_schema.set_inverted_index(true);
462            }
463            SetIndexOption::Skipping {
464                column_name,
465                options,
466            } => {
467                set_column_skipping_index_options(column_schema, column_name, options)?;
468            }
469        }
470
471        Ok(())
472    }
473
474    fn unset_index(
475        &self,
476        column_schema: &mut ColumnSchema,
477        request: &UnsetIndexOption,
478    ) -> Result<()> {
479        match request {
480            UnsetIndexOption::Fulltext { column_name } => {
481                let current_fulltext_options = column_schema
482                    .fulltext_options()
483                    .context(error::SetFulltextOptionsSnafu { column_name })?;
484                unset_column_fulltext_options(
485                    column_schema,
486                    column_name,
487                    current_fulltext_options.clone(),
488                )?
489            }
490            UnsetIndexOption::Inverted { .. } => {
491                column_schema.set_inverted_index(false);
492            }
493            UnsetIndexOption::Skipping { column_name } => {
494                unset_column_skipping_index_options(column_schema, column_name)?;
495            }
496        }
497
498        Ok(())
499    }
500
501    // TODO(yingwen): Remove this.
502    /// Allocate a new column for the table.
503    ///
504    /// This method would bump the `next_column_id` of the meta.
505    pub fn alloc_new_column(
506        &mut self,
507        table_name: &str,
508        new_column: &ColumnSchema,
509    ) -> Result<ColumnDescriptor> {
510        let desc = ColumnDescriptorBuilder::new(
511            self.next_column_id as ColumnId,
512            &new_column.name,
513            new_column.data_type.clone(),
514        )
515        .is_nullable(new_column.is_nullable())
516        .default_constraint(new_column.default_constraint().cloned())
517        .build()
518        .context(error::BuildColumnDescriptorSnafu {
519            table_name,
520            column_name: &new_column.name,
521        })?;
522
523        // Bump next column id.
524        self.next_column_id += 1;
525
526        Ok(desc)
527    }
528
529    /// Create a [`TableMetaBuilder`] from the current TableMeta.
530    fn new_meta_builder(&self) -> TableMetaBuilder {
531        let mut builder = TableMetaBuilder::from(self);
532        // Manually remove value_indices.
533        builder.value_indices = None;
534        builder
535    }
536
537    // TODO(yingwen): Tests add if not exists.
538    fn add_columns(
539        &self,
540        table_name: &str,
541        requests: &[AddColumnRequest],
542    ) -> Result<TableMetaBuilder> {
543        let table_schema = &self.schema;
544        let mut meta_builder = self.new_meta_builder();
545        let original_primary_key_indices: HashSet<&usize> =
546            self.primary_key_indices.iter().collect();
547
548        let mut names = HashSet::with_capacity(requests.len());
549        let mut new_columns = Vec::with_capacity(requests.len());
550        for col_to_add in requests {
551            if let Some(column_schema) =
552                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
553            {
554                // If the column already exists.
555                ensure!(
556                    col_to_add.add_if_not_exists,
557                    error::ColumnExistsSnafu {
558                        table_name,
559                        column_name: &col_to_add.column_schema.name
560                    },
561                );
562
563                // Checks if the type is the same
564                ensure!(
565                    column_schema.data_type == col_to_add.column_schema.data_type,
566                    error::InvalidAlterRequestSnafu {
567                        table: table_name,
568                        err: format!(
569                            "column {} already exists with different type {:?}",
570                            col_to_add.column_schema.name, column_schema.data_type,
571                        ),
572                    }
573                );
574            } else {
575                // A new column.
576                // Ensures we only add a column once.
577                ensure!(
578                    names.insert(&col_to_add.column_schema.name),
579                    error::InvalidAlterRequestSnafu {
580                        table: table_name,
581                        err: format!(
582                            "add column {} more than once",
583                            col_to_add.column_schema.name
584                        ),
585                    }
586                );
587
588                ensure!(
589                    col_to_add.column_schema.is_nullable()
590                        || col_to_add.column_schema.default_constraint().is_some(),
591                    error::InvalidAlterRequestSnafu {
592                        table: table_name,
593                        err: format!(
594                            "no default value for column {}",
595                            col_to_add.column_schema.name
596                        ),
597                    },
598                );
599
600                new_columns.push(col_to_add.clone());
601            }
602        }
603        let requests = &new_columns[..];
604
605        let SplitResult {
606            columns_at_first,
607            columns_at_after,
608            columns_at_last,
609            column_names,
610        } = self.split_requests_by_column_location(table_name, requests)?;
611        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
612        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
613        // add new columns with FIRST, and in reverse order of requests.
614        columns_at_first.iter().rev().for_each(|request| {
615            if request.is_key {
616                // If a key column is added, we also need to store its index in primary_key_indices.
617                primary_key_indices.push(columns.len());
618            }
619            columns.push(request.column_schema.clone());
620        });
621        // add existed columns in original order and handle new columns with AFTER.
622        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
623            if original_primary_key_indices.contains(&index) {
624                primary_key_indices.push(columns.len());
625            }
626            columns.push(column_schema.clone());
627            if let Some(requests) = columns_at_after.get(&column_schema.name) {
628                requests.iter().rev().for_each(|request| {
629                    if request.is_key {
630                        // If a key column is added, we also need to store its index in primary_key_indices.
631                        primary_key_indices.push(columns.len());
632                    }
633                    columns.push(request.column_schema.clone());
634                });
635            }
636        }
637        // add new columns without location info to last.
638        columns_at_last.iter().for_each(|request| {
639            if request.is_key {
640                // If a key column is added, we also need to store its index in primary_key_indices.
641                primary_key_indices.push(columns.len());
642            }
643            columns.push(request.column_schema.clone());
644        });
645
646        let mut builder = SchemaBuilder::try_from(columns)
647            .with_context(|_| error::SchemaBuildSnafu {
648                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
649            })?
650            // Also bump the schema version.
651            .version(table_schema.version() + 1);
652        for (k, v) in table_schema.metadata().iter() {
653            builder = builder.add_metadata(k, v);
654        }
655        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
656            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
657        })?;
658
659        let partition_key_indices = self
660            .partition_key_indices
661            .iter()
662            .map(|idx| table_schema.column_name_by_index(*idx))
663            // This unwrap is safe since we only add new columns.
664            .map(|name| new_schema.column_index_by_name(name).unwrap())
665            .collect();
666
667        // value_indices would be generated automatically.
668        let _ = meta_builder
669            .schema(Arc::new(new_schema))
670            .primary_key_indices(primary_key_indices)
671            .partition_key_indices(partition_key_indices);
672
673        Ok(meta_builder)
674    }
675
676    fn remove_columns(
677        &self,
678        table_name: &str,
679        column_names: &[String],
680    ) -> Result<TableMetaBuilder> {
681        let table_schema = &self.schema;
682        let column_names: HashSet<_> = column_names.iter().collect();
683        let mut meta_builder = self.new_meta_builder();
684
685        let timestamp_index = table_schema.timestamp_index();
686        // Check whether columns are existing and not in primary key index.
687        for column_name in &column_names {
688            if let Some(index) = table_schema.column_index_by_name(column_name) {
689                // This is a linear search, but since there won't be too much columns, the performance should
690                // be acceptable.
691                ensure!(
692                    !self.primary_key_indices.contains(&index),
693                    error::RemoveColumnInIndexSnafu {
694                        column_name: *column_name,
695                        table_name,
696                    }
697                );
698
699                ensure!(
700                    !self.partition_key_indices.contains(&index),
701                    error::RemovePartitionColumnSnafu {
702                        column_name: *column_name,
703                        table_name,
704                    }
705                );
706
707                if let Some(ts_index) = timestamp_index {
708                    // Not allowed to remove column in timestamp index.
709                    ensure!(
710                        index != ts_index,
711                        error::RemoveColumnInIndexSnafu {
712                            column_name: table_schema.column_name_by_index(ts_index),
713                            table_name,
714                        }
715                    );
716                }
717            } else {
718                return error::ColumnNotExistsSnafu {
719                    column_name: *column_name,
720                    table_name,
721                }
722                .fail()?;
723            }
724        }
725
726        // Collect columns after removal.
727        let columns: Vec<_> = table_schema
728            .column_schemas()
729            .iter()
730            .filter(|column_schema| !column_names.contains(&column_schema.name))
731            .cloned()
732            .collect();
733
734        let mut builder = SchemaBuilder::try_from_columns(columns)
735            .with_context(|_| error::SchemaBuildSnafu {
736                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
737            })?
738            // Also bump the schema version.
739            .version(table_schema.version() + 1);
740        for (k, v) in table_schema.metadata().iter() {
741            builder = builder.add_metadata(k, v);
742        }
743        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
744            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
745        })?;
746
747        // Rebuild the indices of primary key columns.
748        let primary_key_indices = self
749            .primary_key_indices
750            .iter()
751            .map(|idx| table_schema.column_name_by_index(*idx))
752            // This unwrap is safe since we don't allow removing a primary key column.
753            .map(|name| new_schema.column_index_by_name(name).unwrap())
754            .collect();
755
756        let partition_key_indices = self
757            .partition_key_indices
758            .iter()
759            .map(|idx| table_schema.column_name_by_index(*idx))
760            // This unwrap is safe since we don't allow removing a partition key column.
761            .map(|name| new_schema.column_index_by_name(name).unwrap())
762            .collect();
763
764        let _ = meta_builder
765            .schema(Arc::new(new_schema))
766            .primary_key_indices(primary_key_indices)
767            .partition_key_indices(partition_key_indices);
768
769        Ok(meta_builder)
770    }
771
772    fn modify_column_types(
773        &self,
774        table_name: &str,
775        requests: &[ModifyColumnTypeRequest],
776    ) -> Result<TableMetaBuilder> {
777        let table_schema = &self.schema;
778        let mut meta_builder = self.new_meta_builder();
779
780        let mut modify_column_types = HashMap::with_capacity(requests.len());
781        let timestamp_index = table_schema.timestamp_index();
782
783        for col_to_change in requests {
784            let change_column_name = &col_to_change.column_name;
785
786            let index = table_schema
787                .column_index_by_name(change_column_name)
788                .with_context(|| error::ColumnNotExistsSnafu {
789                    column_name: change_column_name,
790                    table_name,
791                })?;
792
793            let column = &table_schema.column_schemas()[index];
794
795            ensure!(
796                !self.primary_key_indices.contains(&index),
797                error::InvalidAlterRequestSnafu {
798                    table: table_name,
799                    err: format!(
800                        "Not allowed to change primary key index column '{}'",
801                        column.name
802                    )
803                }
804            );
805
806            if let Some(ts_index) = timestamp_index {
807                // Not allowed to change column datatype in timestamp index.
808                ensure!(
809                    index != ts_index,
810                    error::InvalidAlterRequestSnafu {
811                        table: table_name,
812                        err: format!(
813                            "Not allowed to change timestamp index column '{}' datatype",
814                            column.name
815                        )
816                    }
817                );
818            }
819
820            ensure!(
821                modify_column_types
822                    .insert(&col_to_change.column_name, col_to_change)
823                    .is_none(),
824                error::InvalidAlterRequestSnafu {
825                    table: table_name,
826                    err: format!(
827                        "change column datatype {} more than once",
828                        col_to_change.column_name
829                    ),
830                }
831            );
832
833            ensure!(
834                column
835                    .data_type
836                    .can_arrow_type_cast_to(&col_to_change.target_type),
837                error::InvalidAlterRequestSnafu {
838                    table: table_name,
839                    err: format!(
840                        "column '{}' cannot be cast automatically to type '{}'",
841                        col_to_change.column_name, col_to_change.target_type,
842                    ),
843                }
844            );
845
846            ensure!(
847                column.is_nullable(),
848                error::InvalidAlterRequestSnafu {
849                    table: table_name,
850                    err: format!(
851                        "column '{}' must be nullable to ensure safe conversion.",
852                        col_to_change.column_name,
853                    ),
854                }
855            );
856        }
857        // Collect columns after changed.
858
859        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
860        for mut column in table_schema.column_schemas().iter().cloned() {
861            if let Some(change_column) = modify_column_types.get(&column.name) {
862                column.data_type = change_column.target_type.clone();
863                let new_default = if let Some(default_value) = column.default_constraint() {
864                    Some(
865                        default_value
866                            .cast_to_datatype(&change_column.target_type)
867                            .with_context(|_| error::CastDefaultValueSnafu {
868                                reason: format!(
869                                    "Failed to cast default value from {:?} to type {:?}",
870                                    default_value, &change_column.target_type
871                                ),
872                            })?,
873                    )
874                } else {
875                    None
876                };
877                column = column
878                    .clone()
879                    .with_default_constraint(new_default.clone())
880                    .with_context(|_| error::CastDefaultValueSnafu {
881                        reason: format!("Failed to set new default: {:?}", new_default),
882                    })?;
883            }
884            columns.push(column)
885        }
886
887        let mut builder = SchemaBuilder::try_from_columns(columns)
888            .with_context(|_| error::SchemaBuildSnafu {
889                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
890            })?
891            // Also bump the schema version.
892            .version(table_schema.version() + 1);
893        for (k, v) in table_schema.metadata().iter() {
894            builder = builder.add_metadata(k, v);
895        }
896        let new_schema = builder.build().with_context(|_| {
897            let column_names: Vec<_> = requests
898                .iter()
899                .map(|request| &request.column_name)
900                .collect();
901
902            error::SchemaBuildSnafu {
903                msg: format!(
904                    "Table {table_name} cannot change datatype with columns {column_names:?}"
905                ),
906            }
907        })?;
908
909        let _ = meta_builder
910            .schema(Arc::new(new_schema))
911            .primary_key_indices(self.primary_key_indices.clone());
912
913        Ok(meta_builder)
914    }
915
916    /// Split requests into different groups using column location info.
917    fn split_requests_by_column_location<'a>(
918        &self,
919        table_name: &str,
920        requests: &'a [AddColumnRequest],
921    ) -> Result<SplitResult<'a>> {
922        let table_schema = &self.schema;
923        let mut columns_at_first = Vec::new();
924        let mut columns_at_after = HashMap::new();
925        let mut columns_at_last = Vec::new();
926        let mut column_names = Vec::with_capacity(requests.len());
927        for request in requests {
928            // Check whether columns to add are already existing.
929            let column_name = &request.column_schema.name;
930            column_names.push(column_name.clone());
931            ensure!(
932                table_schema.column_schema_by_name(column_name).is_none(),
933                error::ColumnExistsSnafu {
934                    column_name,
935                    table_name,
936                }
937            );
938            match request.location.as_ref() {
939                Some(AddColumnLocation::First) => {
940                    columns_at_first.push(request);
941                }
942                Some(AddColumnLocation::After { column_name }) => {
943                    ensure!(
944                        table_schema.column_schema_by_name(column_name).is_some(),
945                        error::ColumnNotExistsSnafu {
946                            column_name,
947                            table_name,
948                        }
949                    );
950                    columns_at_after
951                        .entry(column_name.clone())
952                        .or_insert(Vec::new())
953                        .push(request);
954                }
955                None => {
956                    columns_at_last.push(request);
957                }
958            }
959        }
960        Ok(SplitResult {
961            columns_at_first,
962            columns_at_after,
963            columns_at_last,
964            column_names,
965        })
966    }
967
968    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
969        let table_schema = &self.schema;
970        let mut meta_builder = self.new_meta_builder();
971        let mut columns = Vec::with_capacity(table_schema.num_columns());
972        for column_schema in table_schema.column_schemas() {
973            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
974                // Drop default constraint.
975                ensure!(
976                    column_schema.default_constraint().is_some(),
977                    error::InvalidAlterRequestSnafu {
978                        table: table_name,
979                        err: format!("column {name} does not have a default value"),
980                    }
981                );
982                if !column_schema.is_nullable() {
983                    return error::InvalidAlterRequestSnafu {
984                        table: table_name,
985                        err: format!(
986                            "column {name} is not nullable and `default` cannot be dropped",
987                        ),
988                    }
989                    .fail();
990                }
991                let new_column_schema = column_schema.clone();
992                let new_column_schema = new_column_schema
993                    .with_default_constraint(None)
994                    .with_context(|_| error::SchemaBuildSnafu {
995                        msg: format!("Table {table_name} cannot drop default values"),
996                    })?;
997                columns.push(new_column_schema);
998            } else {
999                columns.push(column_schema.clone());
1000            }
1001        }
1002
1003        let mut builder = SchemaBuilder::try_from_columns(columns)
1004            .with_context(|_| error::SchemaBuildSnafu {
1005                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1006            })?
1007            // Also bump the schema version.
1008            .version(table_schema.version() + 1);
1009        for (k, v) in table_schema.metadata().iter() {
1010            builder = builder.add_metadata(k, v);
1011        }
1012        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1013            msg: format!("Table {table_name} cannot drop default values"),
1014        })?;
1015
1016        let _ = meta_builder.schema(Arc::new(new_schema));
1017
1018        Ok(meta_builder)
1019    }
1020
1021    fn set_defaults(
1022        &self,
1023        table_name: &str,
1024        set_defaults: &[SetDefaultRequest],
1025    ) -> Result<TableMetaBuilder> {
1026        let table_schema = &self.schema;
1027        let mut meta_builder = self.new_meta_builder();
1028        let mut columns = Vec::with_capacity(table_schema.num_columns());
1029        for column_schema in table_schema.column_schemas() {
1030            if let Some(set_default) = set_defaults
1031                .iter()
1032                .find(|s| s.column_name == column_schema.name)
1033            {
1034                let new_column_schema = column_schema.clone();
1035                let new_column_schema = new_column_schema
1036                    .with_default_constraint(set_default.default_constraint.clone())
1037                    .with_context(|_| error::SchemaBuildSnafu {
1038                        msg: format!("Table {table_name} cannot set default values"),
1039                    })?;
1040                columns.push(new_column_schema);
1041            } else {
1042                columns.push(column_schema.clone());
1043            }
1044        }
1045
1046        let mut builder = SchemaBuilder::try_from_columns(columns)
1047            .with_context(|_| error::SchemaBuildSnafu {
1048                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1049            })?
1050            // Also bump the schema version.
1051            .version(table_schema.version() + 1);
1052        for (k, v) in table_schema.metadata().iter() {
1053            builder = builder.add_metadata(k, v);
1054        }
1055        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1056            msg: format!("Table {table_name} cannot set default values"),
1057        })?;
1058
1059        let _ = meta_builder.schema(Arc::new(new_schema));
1060
1061        Ok(meta_builder)
1062    }
1063}
1064
1065#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1066#[builder(pattern = "owned")]
1067pub struct TableInfo {
1068    /// Id and version of the table.
1069    #[builder(default, setter(into))]
1070    pub ident: TableIdent,
1071    /// Name of the table.
1072    #[builder(setter(into))]
1073    pub name: String,
1074    /// Comment of the table.
1075    #[builder(default, setter(into))]
1076    pub desc: Option<String>,
1077    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1078    pub catalog_name: String,
1079    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1080    pub schema_name: String,
1081    pub meta: TableMeta,
1082    #[builder(default = "TableType::Base")]
1083    pub table_type: TableType,
1084}
1085
1086pub type TableInfoRef = Arc<TableInfo>;
1087
1088impl TableInfo {
1089    pub fn table_id(&self) -> TableId {
1090        self.ident.table_id
1091    }
1092
1093    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1094    pub fn full_table_name(&self) -> String {
1095        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1096    }
1097
1098    pub fn get_db_string(&self) -> String {
1099        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1100    }
1101
1102    /// Returns true when the table is the metric engine's physical table.
1103    pub fn is_physical_table(&self) -> bool {
1104        self.meta
1105            .options
1106            .extra_options
1107            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1108    }
1109
1110    /// Return true if the table's TTL is `instant`.
1111    pub fn is_ttl_instant_table(&self) -> bool {
1112        self.meta
1113            .options
1114            .ttl
1115            .map(|t| t.is_instant())
1116            .unwrap_or(false)
1117    }
1118}
1119
1120impl TableInfoBuilder {
1121    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1122        Self {
1123            name: Some(name.into()),
1124            meta: Some(meta),
1125            ..Default::default()
1126        }
1127    }
1128
1129    pub fn table_id(mut self, id: TableId) -> Self {
1130        let ident = self.ident.get_or_insert_with(TableIdent::default);
1131        ident.table_id = id;
1132        self
1133    }
1134
1135    pub fn table_version(mut self, version: TableVersion) -> Self {
1136        let ident = self.ident.get_or_insert_with(TableIdent::default);
1137        ident.version = version;
1138        self
1139    }
1140}
1141
1142impl TableIdent {
1143    pub fn new(table_id: TableId) -> Self {
1144        Self {
1145            table_id,
1146            version: 0,
1147        }
1148    }
1149}
1150
1151impl From<TableId> for TableIdent {
1152    fn from(table_id: TableId) -> Self {
1153        Self::new(table_id)
1154    }
1155}
1156
1157/// Struct used to serialize and deserialize [`TableMeta`].
1158#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1159pub struct RawTableMeta {
1160    pub schema: RawSchema,
1161    /// The indices of columns in primary key. Note that the index of timestamp column
1162    /// is not included. Order matters to this array.
1163    pub primary_key_indices: Vec<usize>,
1164    ///  The indices of columns in value. The index of timestamp column is included.
1165    /// Order doesn't matter to this array.
1166    pub value_indices: Vec<usize>,
1167    /// Engine type of this table. Usually in small case.
1168    pub engine: String,
1169    /// Next column id of a new column.
1170    /// It's used to ensure all columns with the same name across all regions have the same column id.
1171    pub next_column_id: ColumnId,
1172    pub options: TableOptions,
1173    pub created_on: DateTime<Utc>,
1174    pub updated_on: DateTime<Utc>,
1175    /// Order doesn't matter to this array.
1176    #[serde(default)]
1177    pub partition_key_indices: Vec<usize>,
1178    /// Map of column name to column id.
1179    /// Note: This field may be empty for older versions that did not include this field.
1180    #[serde(default)]
1181    pub column_ids: Vec<ColumnId>,
1182}
1183
1184impl<'de> Deserialize<'de> for RawTableMeta {
1185    fn deserialize<D>(
1186        deserializer: D,
1187    ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1188    where
1189        D: Deserializer<'de>,
1190    {
1191        #[derive(Deserialize)]
1192        struct Helper {
1193            schema: RawSchema,
1194            primary_key_indices: Vec<usize>,
1195            value_indices: Vec<usize>,
1196            engine: String,
1197            next_column_id: u32,
1198            options: TableOptions,
1199            created_on: DateTime<Utc>,
1200            updated_on: Option<DateTime<Utc>>,
1201            #[serde(default)]
1202            partition_key_indices: Vec<usize>,
1203            #[serde(default)]
1204            column_ids: Vec<ColumnId>,
1205        }
1206
1207        let h = Helper::deserialize(deserializer)?;
1208        Ok(RawTableMeta {
1209            schema: h.schema,
1210            primary_key_indices: h.primary_key_indices,
1211            value_indices: h.value_indices,
1212            engine: h.engine,
1213            next_column_id: h.next_column_id,
1214            options: h.options,
1215            created_on: h.created_on,
1216            updated_on: h.updated_on.unwrap_or(h.created_on),
1217            partition_key_indices: h.partition_key_indices,
1218            column_ids: h.column_ids,
1219        })
1220    }
1221}
1222
1223impl From<TableMeta> for RawTableMeta {
1224    fn from(meta: TableMeta) -> RawTableMeta {
1225        RawTableMeta {
1226            schema: RawSchema::from(&*meta.schema),
1227            primary_key_indices: meta.primary_key_indices,
1228            value_indices: meta.value_indices,
1229            engine: meta.engine,
1230            next_column_id: meta.next_column_id,
1231            options: meta.options,
1232            created_on: meta.created_on,
1233            updated_on: meta.updated_on,
1234            partition_key_indices: meta.partition_key_indices,
1235            column_ids: meta.column_ids,
1236        }
1237    }
1238}
1239
1240impl TryFrom<RawTableMeta> for TableMeta {
1241    type Error = ConvertError;
1242
1243    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1244        Ok(TableMeta {
1245            schema: Arc::new(Schema::try_from(raw.schema)?),
1246            primary_key_indices: raw.primary_key_indices,
1247            value_indices: raw.value_indices,
1248            engine: raw.engine,
1249            next_column_id: raw.next_column_id,
1250            options: raw.options,
1251            created_on: raw.created_on,
1252            updated_on: raw.updated_on,
1253            partition_key_indices: raw.partition_key_indices,
1254            column_ids: raw.column_ids,
1255        })
1256    }
1257}
1258
1259/// Struct used to serialize and deserialize [`TableInfo`].
1260#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1261pub struct RawTableInfo {
1262    pub ident: TableIdent,
1263    pub name: String,
1264    pub desc: Option<String>,
1265    pub catalog_name: String,
1266    pub schema_name: String,
1267    pub meta: RawTableMeta,
1268    pub table_type: TableType,
1269}
1270
1271impl RawTableInfo {
1272    /// Returns the map of column name to column id.
1273    ///
1274    /// Note: This method may return an empty map for older versions that did not include this field.
1275    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1276        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1277            None
1278        } else {
1279            Some(
1280                self.meta
1281                    .column_ids
1282                    .iter()
1283                    .enumerate()
1284                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1285                    .collect(),
1286            )
1287        }
1288    }
1289
1290    /// Sort the columns in [RawTableInfo], logical tables require it.
1291    pub fn sort_columns(&mut self) {
1292        let column_schemas = &self.meta.schema.column_schemas;
1293        let primary_keys = self
1294            .meta
1295            .primary_key_indices
1296            .iter()
1297            .map(|index| column_schemas[*index].name.clone())
1298            .collect::<HashSet<_>>();
1299
1300        let name_to_ids = self.name_to_ids().unwrap_or_default();
1301        self.meta
1302            .schema
1303            .column_schemas
1304            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1305
1306        // Compute new indices of sorted columns
1307        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1308        let mut timestamp_index = None;
1309        let mut value_indices =
1310            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1311        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1312        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1313            if primary_keys.contains(&column_schema.name) {
1314                primary_key_indices.push(index);
1315            } else if column_schema.is_time_index() {
1316                value_indices.push(index);
1317                timestamp_index = Some(index);
1318            } else {
1319                value_indices.push(index);
1320            }
1321            if let Some(id) = name_to_ids.get(&column_schema.name) {
1322                column_ids.push(*id);
1323            }
1324        }
1325
1326        // Overwrite table meta
1327        self.meta.schema.timestamp_index = timestamp_index;
1328        self.meta.primary_key_indices = primary_key_indices;
1329        self.meta.value_indices = value_indices;
1330        self.meta.column_ids = column_ids;
1331    }
1332
1333    /// Extracts region options from table info.
1334    ///
1335    /// All "region options" are actually a copy of table options for redundancy.
1336    pub fn to_region_options(&self) -> HashMap<String, String> {
1337        HashMap::from(&self.meta.options)
1338    }
1339
1340    /// Returns the table reference.
1341    pub fn table_ref(&self) -> TableReference<'_> {
1342        TableReference::full(
1343            self.catalog_name.as_str(),
1344            self.schema_name.as_str(),
1345            self.name.as_str(),
1346        )
1347    }
1348}
1349
1350impl From<TableInfo> for RawTableInfo {
1351    fn from(info: TableInfo) -> RawTableInfo {
1352        RawTableInfo {
1353            ident: info.ident,
1354            name: info.name,
1355            desc: info.desc,
1356            catalog_name: info.catalog_name,
1357            schema_name: info.schema_name,
1358            meta: RawTableMeta::from(info.meta),
1359            table_type: info.table_type,
1360        }
1361    }
1362}
1363
1364impl TryFrom<RawTableInfo> for TableInfo {
1365    type Error = ConvertError;
1366
1367    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1368        Ok(TableInfo {
1369            ident: raw.ident,
1370            name: raw.name,
1371            desc: raw.desc,
1372            catalog_name: raw.catalog_name,
1373            schema_name: raw.schema_name,
1374            meta: TableMeta::try_from(raw.meta)?,
1375            table_type: raw.table_type,
1376        })
1377    }
1378}
1379
1380/// Set column fulltext options if it passed the validation.
1381///
1382/// Options allowed to modify:
1383/// * backend
1384///
1385/// Options not allowed to modify:
1386/// * analyzer
1387/// * case_sensitive
1388fn set_column_fulltext_options(
1389    column_schema: &mut ColumnSchema,
1390    column_name: &str,
1391    options: &FulltextOptions,
1392    current_options: Option<FulltextOptions>,
1393) -> Result<()> {
1394    if let Some(current_options) = current_options {
1395        ensure!(
1396            current_options.analyzer == options.analyzer
1397                && current_options.case_sensitive == options.case_sensitive,
1398            error::InvalidColumnOptionSnafu {
1399                column_name,
1400                msg: format!(
1401                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1402                    current_options.analyzer, current_options.case_sensitive
1403                ),
1404            }
1405        );
1406    }
1407
1408    column_schema
1409        .set_fulltext_options(options)
1410        .context(error::SetFulltextOptionsSnafu { column_name })?;
1411
1412    Ok(())
1413}
1414
1415fn unset_column_fulltext_options(
1416    column_schema: &mut ColumnSchema,
1417    column_name: &str,
1418    current_options: Option<FulltextOptions>,
1419) -> Result<()> {
1420    ensure!(
1421        current_options
1422            .as_ref()
1423            .is_some_and(|options| options.enable),
1424        error::InvalidColumnOptionSnafu {
1425            column_name,
1426            msg: "FULLTEXT index already disabled".to_string(),
1427        }
1428    );
1429
1430    let mut options = current_options.unwrap();
1431    options.enable = false;
1432    column_schema
1433        .set_fulltext_options(&options)
1434        .context(error::SetFulltextOptionsSnafu { column_name })?;
1435
1436    Ok(())
1437}
1438
1439fn set_column_skipping_index_options(
1440    column_schema: &mut ColumnSchema,
1441    column_name: &str,
1442    options: &SkippingIndexOptions,
1443) -> Result<()> {
1444    column_schema
1445        .set_skipping_options(options)
1446        .context(error::SetSkippingOptionsSnafu { column_name })?;
1447
1448    Ok(())
1449}
1450
1451fn unset_column_skipping_index_options(
1452    column_schema: &mut ColumnSchema,
1453    column_name: &str,
1454) -> Result<()> {
1455    column_schema
1456        .unset_skipping_options()
1457        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1458    Ok(())
1459}
1460
1461#[cfg(test)]
1462mod tests {
1463    use std::assert_matches::assert_matches;
1464
1465    use common_error::ext::ErrorExt;
1466    use common_error::status_code::StatusCode;
1467    use datatypes::data_type::ConcreteDataType;
1468    use datatypes::schema::{
1469        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1470    };
1471
1472    use super::*;
1473    use crate::Error;
1474
1475    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1476    fn new_test_schema() -> Schema {
1477        let column_schemas = vec![
1478            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1479            ColumnSchema::new(
1480                "ts",
1481                ConcreteDataType::timestamp_millisecond_datatype(),
1482                false,
1483            )
1484            .with_time_index(true),
1485            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1486        ];
1487        SchemaBuilder::try_from(column_schemas)
1488            .unwrap()
1489            .version(123)
1490            .build()
1491            .unwrap()
1492    }
1493
1494    #[test]
1495    fn test_raw_convert() {
1496        let schema = Arc::new(new_test_schema());
1497        let meta = TableMetaBuilder::empty()
1498            .schema(schema)
1499            .primary_key_indices(vec![0])
1500            .engine("engine")
1501            .next_column_id(3)
1502            .build()
1503            .unwrap();
1504        let info = TableInfoBuilder::default()
1505            .table_id(10)
1506            .table_version(5)
1507            .name("mytable")
1508            .meta(meta)
1509            .build()
1510            .unwrap();
1511
1512        let raw = RawTableInfo::from(info.clone());
1513        let info_new = TableInfo::try_from(raw).unwrap();
1514
1515        assert_eq!(info, info_new);
1516    }
1517
1518    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1519        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1520        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1521        let alter_kind = AlterKind::AddColumns {
1522            columns: vec![
1523                AddColumnRequest {
1524                    column_schema: new_tag,
1525                    is_key: true,
1526                    location: None,
1527                    add_if_not_exists: false,
1528                },
1529                AddColumnRequest {
1530                    column_schema: new_field,
1531                    is_key: false,
1532                    location: None,
1533                    add_if_not_exists: false,
1534                },
1535            ],
1536        };
1537
1538        let builder = meta
1539            .builder_with_alter_kind("my_table", &alter_kind)
1540            .unwrap();
1541        builder.build().unwrap()
1542    }
1543
1544    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1545        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1546        let new_field = ColumnSchema::new(
1547            "my_field_after_ts",
1548            ConcreteDataType::string_datatype(),
1549            true,
1550        );
1551        let yet_another_field = ColumnSchema::new(
1552            "yet_another_field_after_ts",
1553            ConcreteDataType::int64_datatype(),
1554            true,
1555        );
1556        let alter_kind = AlterKind::AddColumns {
1557            columns: vec![
1558                AddColumnRequest {
1559                    column_schema: new_tag,
1560                    is_key: true,
1561                    location: Some(AddColumnLocation::First),
1562                    add_if_not_exists: false,
1563                },
1564                AddColumnRequest {
1565                    column_schema: new_field,
1566                    is_key: false,
1567                    location: Some(AddColumnLocation::After {
1568                        column_name: "ts".to_string(),
1569                    }),
1570                    add_if_not_exists: false,
1571                },
1572                AddColumnRequest {
1573                    column_schema: yet_another_field,
1574                    is_key: true,
1575                    location: Some(AddColumnLocation::After {
1576                        column_name: "ts".to_string(),
1577                    }),
1578                    add_if_not_exists: false,
1579                },
1580            ],
1581        };
1582
1583        let builder = meta
1584            .builder_with_alter_kind("my_table", &alter_kind)
1585            .unwrap();
1586        builder.build().unwrap()
1587    }
1588
1589    #[test]
1590    fn test_add_columns() {
1591        let schema = Arc::new(new_test_schema());
1592        let meta = TableMetaBuilder::empty()
1593            .schema(schema)
1594            .primary_key_indices(vec![0])
1595            .engine("engine")
1596            .next_column_id(3)
1597            .build()
1598            .unwrap();
1599
1600        let new_meta = add_columns_to_meta(&meta);
1601        let names: Vec<String> = new_meta
1602            .schema
1603            .column_schemas()
1604            .iter()
1605            .map(|column_schema| column_schema.name.clone())
1606            .collect();
1607        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1608        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1609        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1610    }
1611
1612    #[test]
1613    fn test_add_columns_multiple_times() {
1614        let schema = Arc::new(new_test_schema());
1615        let meta = TableMetaBuilder::empty()
1616            .schema(schema)
1617            .primary_key_indices(vec![0])
1618            .engine("engine")
1619            .next_column_id(3)
1620            .build()
1621            .unwrap();
1622
1623        let alter_kind = AlterKind::AddColumns {
1624            columns: vec![
1625                AddColumnRequest {
1626                    column_schema: ColumnSchema::new(
1627                        "col3",
1628                        ConcreteDataType::int32_datatype(),
1629                        true,
1630                    ),
1631                    is_key: true,
1632                    location: None,
1633                    add_if_not_exists: true,
1634                },
1635                AddColumnRequest {
1636                    column_schema: ColumnSchema::new(
1637                        "col3",
1638                        ConcreteDataType::int32_datatype(),
1639                        true,
1640                    ),
1641                    is_key: true,
1642                    location: None,
1643                    add_if_not_exists: true,
1644                },
1645            ],
1646        };
1647        let err = meta
1648            .builder_with_alter_kind("my_table", &alter_kind)
1649            .err()
1650            .unwrap();
1651        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1652    }
1653
1654    #[test]
1655    fn test_remove_columns() {
1656        let schema = Arc::new(new_test_schema());
1657        let meta = TableMetaBuilder::empty()
1658            .schema(schema.clone())
1659            .primary_key_indices(vec![0])
1660            .engine("engine")
1661            .next_column_id(3)
1662            .build()
1663            .unwrap();
1664        // Add more columns so we have enough candidate columns to remove.
1665        let meta = add_columns_to_meta(&meta);
1666
1667        let alter_kind = AlterKind::DropColumns {
1668            names: vec![String::from("col2"), String::from("my_field")],
1669        };
1670        let new_meta = meta
1671            .builder_with_alter_kind("my_table", &alter_kind)
1672            .unwrap()
1673            .build()
1674            .unwrap();
1675
1676        let names: Vec<String> = new_meta
1677            .schema
1678            .column_schemas()
1679            .iter()
1680            .map(|column_schema| column_schema.name.clone())
1681            .collect();
1682        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1683        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1684        assert_eq!(&[1], &new_meta.value_indices[..]);
1685        assert_eq!(
1686            schema.timestamp_column(),
1687            new_meta.schema.timestamp_column()
1688        );
1689    }
1690
1691    #[test]
1692    fn test_remove_multiple_columns_before_timestamp() {
1693        let column_schemas = vec![
1694            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1695            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1696            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1697            ColumnSchema::new(
1698                "ts",
1699                ConcreteDataType::timestamp_millisecond_datatype(),
1700                false,
1701            )
1702            .with_time_index(true),
1703        ];
1704        let schema = Arc::new(
1705            SchemaBuilder::try_from(column_schemas)
1706                .unwrap()
1707                .version(123)
1708                .build()
1709                .unwrap(),
1710        );
1711        let meta = TableMetaBuilder::empty()
1712            .schema(schema.clone())
1713            .primary_key_indices(vec![1])
1714            .engine("engine")
1715            .next_column_id(4)
1716            .build()
1717            .unwrap();
1718
1719        // Remove columns in reverse order to test whether timestamp index is valid.
1720        let alter_kind = AlterKind::DropColumns {
1721            names: vec![String::from("col3"), String::from("col1")],
1722        };
1723        let new_meta = meta
1724            .builder_with_alter_kind("my_table", &alter_kind)
1725            .unwrap()
1726            .build()
1727            .unwrap();
1728
1729        let names: Vec<String> = new_meta
1730            .schema
1731            .column_schemas()
1732            .iter()
1733            .map(|column_schema| column_schema.name.clone())
1734            .collect();
1735        assert_eq!(&["col2", "ts"], &names[..]);
1736        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1737        assert_eq!(&[1], &new_meta.value_indices[..]);
1738        assert_eq!(
1739            schema.timestamp_column(),
1740            new_meta.schema.timestamp_column()
1741        );
1742    }
1743
1744    #[test]
1745    fn test_add_existing_column() {
1746        let schema = Arc::new(new_test_schema());
1747        let meta = TableMetaBuilder::empty()
1748            .schema(schema)
1749            .primary_key_indices(vec![0])
1750            .engine("engine")
1751            .next_column_id(3)
1752            .build()
1753            .unwrap();
1754
1755        let alter_kind = AlterKind::AddColumns {
1756            columns: vec![AddColumnRequest {
1757                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1758                is_key: false,
1759                location: None,
1760                add_if_not_exists: false,
1761            }],
1762        };
1763
1764        let err = meta
1765            .builder_with_alter_kind("my_table", &alter_kind)
1766            .err()
1767            .unwrap();
1768        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1769
1770        // Add if not exists
1771        let alter_kind = AlterKind::AddColumns {
1772            columns: vec![AddColumnRequest {
1773                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1774                is_key: true,
1775                location: None,
1776                add_if_not_exists: true,
1777            }],
1778        };
1779        let new_meta = meta
1780            .builder_with_alter_kind("my_table", &alter_kind)
1781            .unwrap()
1782            .build()
1783            .unwrap();
1784        assert_eq!(
1785            meta.schema.column_schemas(),
1786            new_meta.schema.column_schemas()
1787        );
1788        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1789    }
1790
1791    #[test]
1792    fn test_add_different_type_column() {
1793        let schema = Arc::new(new_test_schema());
1794        let meta = TableMetaBuilder::empty()
1795            .schema(schema)
1796            .primary_key_indices(vec![0])
1797            .engine("engine")
1798            .next_column_id(3)
1799            .build()
1800            .unwrap();
1801
1802        // Add if not exists, but different type.
1803        let alter_kind = AlterKind::AddColumns {
1804            columns: vec![AddColumnRequest {
1805                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1806                is_key: false,
1807                location: None,
1808                add_if_not_exists: true,
1809            }],
1810        };
1811        let err = meta
1812            .builder_with_alter_kind("my_table", &alter_kind)
1813            .err()
1814            .unwrap();
1815        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1816    }
1817
1818    #[test]
1819    fn test_add_invalid_column() {
1820        let schema = Arc::new(new_test_schema());
1821        let meta = TableMetaBuilder::empty()
1822            .schema(schema)
1823            .primary_key_indices(vec![0])
1824            .engine("engine")
1825            .next_column_id(3)
1826            .build()
1827            .unwrap();
1828
1829        // Not nullable and no default value.
1830        let alter_kind = AlterKind::AddColumns {
1831            columns: vec![AddColumnRequest {
1832                column_schema: ColumnSchema::new(
1833                    "weny",
1834                    ConcreteDataType::string_datatype(),
1835                    false,
1836                ),
1837                is_key: false,
1838                location: None,
1839                add_if_not_exists: false,
1840            }],
1841        };
1842
1843        let err = meta
1844            .builder_with_alter_kind("my_table", &alter_kind)
1845            .err()
1846            .unwrap();
1847        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1848    }
1849
1850    #[test]
1851    fn test_remove_unknown_column() {
1852        let schema = Arc::new(new_test_schema());
1853        let meta = TableMetaBuilder::empty()
1854            .schema(schema)
1855            .primary_key_indices(vec![0])
1856            .engine("engine")
1857            .next_column_id(3)
1858            .build()
1859            .unwrap();
1860
1861        let alter_kind = AlterKind::DropColumns {
1862            names: vec![String::from("unknown")],
1863        };
1864
1865        let err = meta
1866            .builder_with_alter_kind("my_table", &alter_kind)
1867            .err()
1868            .unwrap();
1869        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1870    }
1871
1872    #[test]
1873    fn test_change_unknown_column_data_type() {
1874        let schema = Arc::new(new_test_schema());
1875        let meta = TableMetaBuilder::empty()
1876            .schema(schema)
1877            .primary_key_indices(vec![0])
1878            .engine("engine")
1879            .next_column_id(3)
1880            .build()
1881            .unwrap();
1882
1883        let alter_kind = AlterKind::ModifyColumnTypes {
1884            columns: vec![ModifyColumnTypeRequest {
1885                column_name: "unknown".to_string(),
1886                target_type: ConcreteDataType::string_datatype(),
1887            }],
1888        };
1889
1890        let err = meta
1891            .builder_with_alter_kind("my_table", &alter_kind)
1892            .err()
1893            .unwrap();
1894        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1895    }
1896
1897    #[test]
1898    fn test_remove_key_column() {
1899        let schema = Arc::new(new_test_schema());
1900        let meta = TableMetaBuilder::empty()
1901            .schema(schema)
1902            .primary_key_indices(vec![0])
1903            .engine("engine")
1904            .next_column_id(3)
1905            .build()
1906            .unwrap();
1907
1908        // Remove column in primary key.
1909        let alter_kind = AlterKind::DropColumns {
1910            names: vec![String::from("col1")],
1911        };
1912
1913        let err = meta
1914            .builder_with_alter_kind("my_table", &alter_kind)
1915            .err()
1916            .unwrap();
1917        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1918
1919        // Remove timestamp column.
1920        let alter_kind = AlterKind::DropColumns {
1921            names: vec![String::from("ts")],
1922        };
1923
1924        let err = meta
1925            .builder_with_alter_kind("my_table", &alter_kind)
1926            .err()
1927            .unwrap();
1928        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1929    }
1930
1931    #[test]
1932    fn test_remove_partition_column() {
1933        let schema = Arc::new(new_test_schema());
1934        let meta = TableMetaBuilder::empty()
1935            .schema(schema)
1936            .primary_key_indices(vec![])
1937            .partition_key_indices(vec![0])
1938            .engine("engine")
1939            .next_column_id(3)
1940            .build()
1941            .unwrap();
1942        // Remove column in primary key.
1943        let alter_kind = AlterKind::DropColumns {
1944            names: vec![String::from("col1")],
1945        };
1946
1947        let err = meta
1948            .builder_with_alter_kind("my_table", &alter_kind)
1949            .err()
1950            .unwrap();
1951        assert_matches!(err, Error::RemovePartitionColumn { .. });
1952    }
1953
1954    #[test]
1955    fn test_change_key_column_data_type() {
1956        let schema = Arc::new(new_test_schema());
1957        let meta = TableMetaBuilder::empty()
1958            .schema(schema)
1959            .primary_key_indices(vec![0])
1960            .engine("engine")
1961            .next_column_id(3)
1962            .build()
1963            .unwrap();
1964
1965        // Remove column in primary key.
1966        let alter_kind = AlterKind::ModifyColumnTypes {
1967            columns: vec![ModifyColumnTypeRequest {
1968                column_name: "col1".to_string(),
1969                target_type: ConcreteDataType::string_datatype(),
1970            }],
1971        };
1972
1973        let err = meta
1974            .builder_with_alter_kind("my_table", &alter_kind)
1975            .err()
1976            .unwrap();
1977        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1978
1979        // Remove timestamp column.
1980        let alter_kind = AlterKind::ModifyColumnTypes {
1981            columns: vec![ModifyColumnTypeRequest {
1982                column_name: "ts".to_string(),
1983                target_type: ConcreteDataType::string_datatype(),
1984            }],
1985        };
1986
1987        let err = meta
1988            .builder_with_alter_kind("my_table", &alter_kind)
1989            .err()
1990            .unwrap();
1991        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1992    }
1993
1994    #[test]
1995    fn test_alloc_new_column() {
1996        let schema = Arc::new(new_test_schema());
1997        let mut meta = TableMetaBuilder::empty()
1998            .schema(schema)
1999            .primary_key_indices(vec![0])
2000            .engine("engine")
2001            .next_column_id(3)
2002            .build()
2003            .unwrap();
2004        assert_eq!(3, meta.next_column_id);
2005
2006        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2007        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2008
2009        assert_eq!(4, meta.next_column_id);
2010        assert_eq!(column_schema.name, desc.name);
2011    }
2012
2013    #[test]
2014    fn test_add_columns_with_location() {
2015        let schema = Arc::new(new_test_schema());
2016        let meta = TableMetaBuilder::empty()
2017            .schema(schema)
2018            .primary_key_indices(vec![0])
2019            // partition col: col1, col2
2020            .partition_key_indices(vec![0, 2])
2021            .engine("engine")
2022            .next_column_id(3)
2023            .build()
2024            .unwrap();
2025
2026        let new_meta = add_columns_to_meta_with_location(&meta);
2027        let names: Vec<String> = new_meta
2028            .schema
2029            .column_schemas()
2030            .iter()
2031            .map(|column_schema| column_schema.name.clone())
2032            .collect();
2033        assert_eq!(
2034            &[
2035                "my_tag_first",               // primary key column
2036                "col1",                       // partition column
2037                "ts",                         // timestamp column
2038                "yet_another_field_after_ts", // primary key column
2039                "my_field_after_ts",          // value column
2040                "col2",                       // partition column
2041            ],
2042            &names[..]
2043        );
2044        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2045        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2046        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2047    }
2048
2049    #[test]
2050    fn test_modify_column_fulltext_options() {
2051        let schema = Arc::new(new_test_schema());
2052        let meta = TableMetaBuilder::empty()
2053            .schema(schema)
2054            .primary_key_indices(vec![0])
2055            .engine("engine")
2056            .next_column_id(3)
2057            .build()
2058            .unwrap();
2059
2060        let alter_kind = AlterKind::SetIndexes {
2061            options: vec![SetIndexOption::Fulltext {
2062                column_name: "col1".to_string(),
2063                options: FulltextOptions::default(),
2064            }],
2065        };
2066        let err = meta
2067            .builder_with_alter_kind("my_table", &alter_kind)
2068            .err()
2069            .unwrap();
2070        assert_eq!(
2071            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2072            err.to_string()
2073        );
2074
2075        // Add a string column and make it fulltext indexed
2076        let new_meta = add_columns_to_meta_with_location(&meta);
2077        let alter_kind = AlterKind::SetIndexes {
2078            options: vec![SetIndexOption::Fulltext {
2079                column_name: "my_tag_first".to_string(),
2080                options: FulltextOptions::new_unchecked(
2081                    true,
2082                    FulltextAnalyzer::Chinese,
2083                    true,
2084                    FulltextBackend::Bloom,
2085                    1000,
2086                    0.01,
2087                ),
2088            }],
2089        };
2090        let new_meta = new_meta
2091            .builder_with_alter_kind("my_table", &alter_kind)
2092            .unwrap()
2093            .build()
2094            .unwrap();
2095        let column_schema = new_meta
2096            .schema
2097            .column_schema_by_name("my_tag_first")
2098            .unwrap();
2099        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2100        assert!(fulltext_options.enable);
2101        assert_eq!(
2102            datatypes::schema::FulltextAnalyzer::Chinese,
2103            fulltext_options.analyzer
2104        );
2105        assert!(fulltext_options.case_sensitive);
2106
2107        let alter_kind = AlterKind::UnsetIndexes {
2108            options: vec![UnsetIndexOption::Fulltext {
2109                column_name: "my_tag_first".to_string(),
2110            }],
2111        };
2112        let new_meta = new_meta
2113            .builder_with_alter_kind("my_table", &alter_kind)
2114            .unwrap()
2115            .build()
2116            .unwrap();
2117        let column_schema = new_meta
2118            .schema
2119            .column_schema_by_name("my_tag_first")
2120            .unwrap();
2121        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2122        assert!(!fulltext_options.enable);
2123    }
2124}