table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105    fn from(t: TableType) -> datafusion::datasource::TableType {
106        match t {
107            TableType::Base => datafusion::datasource::TableType::Base,
108            TableType::View => datafusion::datasource::TableType::View,
109            TableType::Temporary => datafusion::datasource::TableType::Temporary,
110        }
111    }
112}
113
114/// Identifier of the table.
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117    /// Unique id of this table.
118    pub table_id: TableId,
119    /// Version of the table, bumped when metadata (such as schema) of the table
120    /// being changed.
121    pub version: TableVersion,
122}
123
124/// The table metadata.
125///
126/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
127#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130    pub schema: SchemaRef,
131    /// The indices of columns in primary key. Note that the index of timestamp column
132    /// is not included in these indices.
133    pub primary_key_indices: Vec<usize>,
134    #[builder(default = "self.default_value_indices()?")]
135    pub value_indices: Vec<usize>,
136    #[builder(default, setter(into))]
137    pub engine: String,
138    #[builder(default, setter(into))]
139    pub region_numbers: Vec<u32>,
140    pub next_column_id: ColumnId,
141    /// Table options.
142    #[builder(default)]
143    pub options: TableOptions,
144    #[builder(default = "Utc::now()")]
145    pub created_on: DateTime<Utc>,
146    #[builder(default = "self.default_updated_on()")]
147    pub updated_on: DateTime<Utc>,
148    #[builder(default = "Vec::new()")]
149    pub partition_key_indices: Vec<usize>,
150    #[builder(default = "Vec::new()")]
151    pub column_ids: Vec<ColumnId>,
152}
153
154impl TableMetaBuilder {
155    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
156    #[cfg(any(test, feature = "testing"))]
157    pub fn empty() -> Self {
158        Self {
159            schema: None,
160            primary_key_indices: None,
161            value_indices: None,
162            engine: None,
163            region_numbers: None,
164            next_column_id: None,
165            options: None,
166            created_on: None,
167            updated_on: None,
168            partition_key_indices: None,
169            column_ids: None,
170        }
171    }
172}
173
174impl TableMetaBuilder {
175    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
176        match (&self.primary_key_indices, &self.schema) {
177            (Some(v), Some(schema)) => {
178                let column_schemas = schema.column_schemas();
179                Ok((0..column_schemas.len())
180                    .filter(|idx| !v.contains(idx))
181                    .collect())
182            }
183            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
184        }
185    }
186
187    fn default_updated_on(&self) -> DateTime<Utc> {
188        self.created_on.unwrap_or_default()
189    }
190
191    pub fn new_external_table() -> Self {
192        Self {
193            schema: None,
194            primary_key_indices: Some(Vec::new()),
195            value_indices: Some(Vec::new()),
196            engine: None,
197            region_numbers: Some(Vec::new()),
198            next_column_id: Some(0),
199            options: None,
200            created_on: None,
201            updated_on: None,
202            partition_key_indices: None,
203            column_ids: None,
204        }
205    }
206}
207
208/// The result after splitting requests by column location info.
209struct SplitResult<'a> {
210    /// column requests should be added at first place.
211    columns_at_first: Vec<&'a AddColumnRequest>,
212    /// column requests should be added after already exist columns.
213    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
214    /// column requests should be added at last place.
215    columns_at_last: Vec<&'a AddColumnRequest>,
216    /// all column names should be added.
217    column_names: Vec<String>,
218}
219
220impl TableMeta {
221    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
222        let columns_schemas = &self.schema.column_schemas();
223        self.primary_key_indices
224            .iter()
225            .map(|idx| &columns_schemas[*idx].name)
226    }
227
228    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
229        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
230        let columns_schemas = self.schema.column_schemas();
231        let primary_key_indices = &self.primary_key_indices;
232        columns_schemas
233            .iter()
234            .enumerate()
235            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
236            .map(|(_, cs)| &cs.name)
237    }
238
239    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
240        let columns_schemas = &self.schema.column_schemas();
241        self.partition_key_indices
242            .iter()
243            .map(|idx| &columns_schemas[*idx].name)
244    }
245
246    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
247    ///
248    /// The returned builder would derive the next column id of this meta.
249    pub fn builder_with_alter_kind(
250        &self,
251        table_name: &str,
252        alter_kind: &AlterKind,
253    ) -> Result<TableMetaBuilder> {
254        let mut builder = match alter_kind {
255            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
256            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
257            AlterKind::ModifyColumnTypes { columns } => {
258                self.modify_column_types(table_name, columns)
259            }
260            // No need to rebuild table meta when renaming tables.
261            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
262            AlterKind::SetTableOptions { options } => self.set_table_options(options),
263            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
264            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
265            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
266            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
267            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
268        }?;
269        let _ = builder.updated_on(Utc::now());
270        Ok(builder)
271    }
272
273    /// Creates a [TableMetaBuilder] with modified table options.
274    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
275        let mut new_options = self.options.clone();
276
277        for request in requests {
278            match request {
279                SetRegionOption::Ttl(new_ttl) => {
280                    new_options.ttl = *new_ttl;
281                }
282                SetRegionOption::Twsc(key, value) => {
283                    if !value.is_empty() {
284                        new_options.extra_options.insert(key.clone(), value.clone());
285                        // Ensure node restart correctly.
286                        new_options.extra_options.insert(
287                            COMPACTION_TYPE.to_string(),
288                            COMPACTION_TYPE_TWCS.to_string(),
289                        );
290                    } else {
291                        // Invalidate the previous change option if an empty value has been set.
292                        new_options.extra_options.remove(key.as_str());
293                    }
294                }
295                SetRegionOption::Format(value) => {
296                    new_options
297                        .extra_options
298                        .insert(SST_FORMAT_KEY.to_string(), value.clone());
299                }
300            }
301        }
302        let mut builder = self.new_meta_builder();
303        builder.options(new_options);
304
305        Ok(builder)
306    }
307
308    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
309        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
310        self.set_table_options(&requests)
311    }
312
313    fn set_indexes(
314        &self,
315        table_name: &str,
316        requests: &[SetIndexOption],
317    ) -> Result<TableMetaBuilder> {
318        let table_schema = &self.schema;
319        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
320        for request in requests {
321            let column_name = request.column_name();
322            table_schema
323                .column_index_by_name(column_name)
324                .with_context(|| error::ColumnNotExistsSnafu {
325                    column_name,
326                    table_name,
327                })?;
328            set_index_options
329                .entry(column_name)
330                .or_default()
331                .push(request);
332        }
333
334        let mut meta_builder = self.new_meta_builder();
335        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
336        for mut column in table_schema.column_schemas().iter().cloned() {
337            if let Some(request) = set_index_options.get(column.name.as_str()) {
338                for request in request {
339                    self.set_index(&mut column, request)?;
340                }
341            }
342            columns.push(column);
343        }
344
345        let mut builder = SchemaBuilder::try_from_columns(columns)
346            .with_context(|_| error::SchemaBuildSnafu {
347                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
348            })?
349            .version(table_schema.version() + 1);
350
351        for (k, v) in table_schema.metadata().iter() {
352            builder = builder.add_metadata(k, v);
353        }
354
355        let new_schema = builder.build().with_context(|_| {
356            let column_names = requests
357                .iter()
358                .map(|request| request.column_name())
359                .collect::<Vec<_>>();
360            error::SchemaBuildSnafu {
361                msg: format!(
362                    "Table {table_name} cannot set index options with columns {column_names:?}",
363                ),
364            }
365        })?;
366        let _ = meta_builder
367            .schema(Arc::new(new_schema))
368            .primary_key_indices(self.primary_key_indices.clone());
369
370        Ok(meta_builder)
371    }
372
373    fn unset_indexes(
374        &self,
375        table_name: &str,
376        requests: &[UnsetIndexOption],
377    ) -> Result<TableMetaBuilder> {
378        let table_schema = &self.schema;
379        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
380        for request in requests {
381            let column_name = request.column_name();
382            table_schema
383                .column_index_by_name(column_name)
384                .with_context(|| error::ColumnNotExistsSnafu {
385                    column_name,
386                    table_name,
387                })?;
388            set_index_options
389                .entry(column_name)
390                .or_default()
391                .push(request);
392        }
393
394        let mut meta_builder = self.new_meta_builder();
395        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
396        for mut column in table_schema.column_schemas().iter().cloned() {
397            if let Some(request) = set_index_options.get(column.name.as_str()) {
398                for request in request {
399                    self.unset_index(&mut column, request)?;
400                }
401            }
402            columns.push(column);
403        }
404
405        let mut builder = SchemaBuilder::try_from_columns(columns)
406            .with_context(|_| error::SchemaBuildSnafu {
407                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
408            })?
409            .version(table_schema.version() + 1);
410
411        for (k, v) in table_schema.metadata().iter() {
412            builder = builder.add_metadata(k, v);
413        }
414
415        let new_schema = builder.build().with_context(|_| {
416            let column_names = requests
417                .iter()
418                .map(|request| request.column_name())
419                .collect::<Vec<_>>();
420            error::SchemaBuildSnafu {
421                msg: format!(
422                    "Table {table_name} cannot set index options with columns {column_names:?}",
423                ),
424            }
425        })?;
426        let _ = meta_builder
427            .schema(Arc::new(new_schema))
428            .primary_key_indices(self.primary_key_indices.clone());
429
430        Ok(meta_builder)
431    }
432
433    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
434        match request {
435            SetIndexOption::Fulltext {
436                column_name,
437                options,
438            } => {
439                ensure!(
440                    column_schema.data_type.is_string(),
441                    error::InvalidColumnOptionSnafu {
442                        column_name,
443                        msg: "FULLTEXT index only supports string type",
444                    }
445                );
446
447                let current_fulltext_options = column_schema
448                    .fulltext_options()
449                    .context(error::SetFulltextOptionsSnafu { column_name })?;
450                set_column_fulltext_options(
451                    column_schema,
452                    column_name,
453                    options,
454                    current_fulltext_options,
455                )?;
456            }
457            SetIndexOption::Inverted { column_name } => {
458                debug_assert_eq!(column_schema.name, *column_name);
459                column_schema.set_inverted_index(true);
460            }
461            SetIndexOption::Skipping {
462                column_name,
463                options,
464            } => {
465                set_column_skipping_index_options(column_schema, column_name, options)?;
466            }
467        }
468
469        Ok(())
470    }
471
472    fn unset_index(
473        &self,
474        column_schema: &mut ColumnSchema,
475        request: &UnsetIndexOption,
476    ) -> Result<()> {
477        match request {
478            UnsetIndexOption::Fulltext { column_name } => {
479                let current_fulltext_options = column_schema
480                    .fulltext_options()
481                    .context(error::SetFulltextOptionsSnafu { column_name })?;
482                unset_column_fulltext_options(
483                    column_schema,
484                    column_name,
485                    current_fulltext_options.clone(),
486                )?
487            }
488            UnsetIndexOption::Inverted { .. } => {
489                column_schema.set_inverted_index(false);
490            }
491            UnsetIndexOption::Skipping { column_name } => {
492                unset_column_skipping_index_options(column_schema, column_name)?;
493            }
494        }
495
496        Ok(())
497    }
498
499    // TODO(yingwen): Remove this.
500    /// Allocate a new column for the table.
501    ///
502    /// This method would bump the `next_column_id` of the meta.
503    pub fn alloc_new_column(
504        &mut self,
505        table_name: &str,
506        new_column: &ColumnSchema,
507    ) -> Result<ColumnDescriptor> {
508        let desc = ColumnDescriptorBuilder::new(
509            self.next_column_id as ColumnId,
510            &new_column.name,
511            new_column.data_type.clone(),
512        )
513        .is_nullable(new_column.is_nullable())
514        .default_constraint(new_column.default_constraint().cloned())
515        .build()
516        .context(error::BuildColumnDescriptorSnafu {
517            table_name,
518            column_name: &new_column.name,
519        })?;
520
521        // Bump next column id.
522        self.next_column_id += 1;
523
524        Ok(desc)
525    }
526
527    /// Create a [`TableMetaBuilder`] from the current TableMeta.
528    fn new_meta_builder(&self) -> TableMetaBuilder {
529        let mut builder = TableMetaBuilder::from(self);
530        // Manually remove value_indices.
531        builder.value_indices = None;
532        builder
533    }
534
535    // TODO(yingwen): Tests add if not exists.
536    fn add_columns(
537        &self,
538        table_name: &str,
539        requests: &[AddColumnRequest],
540    ) -> Result<TableMetaBuilder> {
541        let table_schema = &self.schema;
542        let mut meta_builder = self.new_meta_builder();
543        let original_primary_key_indices: HashSet<&usize> =
544            self.primary_key_indices.iter().collect();
545
546        let mut names = HashSet::with_capacity(requests.len());
547        let mut new_columns = Vec::with_capacity(requests.len());
548        for col_to_add in requests {
549            if let Some(column_schema) =
550                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
551            {
552                // If the column already exists.
553                ensure!(
554                    col_to_add.add_if_not_exists,
555                    error::ColumnExistsSnafu {
556                        table_name,
557                        column_name: &col_to_add.column_schema.name
558                    },
559                );
560
561                // Checks if the type is the same
562                ensure!(
563                    column_schema.data_type == col_to_add.column_schema.data_type,
564                    error::InvalidAlterRequestSnafu {
565                        table: table_name,
566                        err: format!(
567                            "column {} already exists with different type {:?}",
568                            col_to_add.column_schema.name, column_schema.data_type,
569                        ),
570                    }
571                );
572            } else {
573                // A new column.
574                // Ensures we only add a column once.
575                ensure!(
576                    names.insert(&col_to_add.column_schema.name),
577                    error::InvalidAlterRequestSnafu {
578                        table: table_name,
579                        err: format!(
580                            "add column {} more than once",
581                            col_to_add.column_schema.name
582                        ),
583                    }
584                );
585
586                ensure!(
587                    col_to_add.column_schema.is_nullable()
588                        || col_to_add.column_schema.default_constraint().is_some(),
589                    error::InvalidAlterRequestSnafu {
590                        table: table_name,
591                        err: format!(
592                            "no default value for column {}",
593                            col_to_add.column_schema.name
594                        ),
595                    },
596                );
597
598                new_columns.push(col_to_add.clone());
599            }
600        }
601        let requests = &new_columns[..];
602
603        let SplitResult {
604            columns_at_first,
605            columns_at_after,
606            columns_at_last,
607            column_names,
608        } = self.split_requests_by_column_location(table_name, requests)?;
609        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
610        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
611        // add new columns with FIRST, and in reverse order of requests.
612        columns_at_first.iter().rev().for_each(|request| {
613            if request.is_key {
614                // If a key column is added, we also need to store its index in primary_key_indices.
615                primary_key_indices.push(columns.len());
616            }
617            columns.push(request.column_schema.clone());
618        });
619        // add existed columns in original order and handle new columns with AFTER.
620        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
621            if original_primary_key_indices.contains(&index) {
622                primary_key_indices.push(columns.len());
623            }
624            columns.push(column_schema.clone());
625            if let Some(requests) = columns_at_after.get(&column_schema.name) {
626                requests.iter().rev().for_each(|request| {
627                    if request.is_key {
628                        // If a key column is added, we also need to store its index in primary_key_indices.
629                        primary_key_indices.push(columns.len());
630                    }
631                    columns.push(request.column_schema.clone());
632                });
633            }
634        }
635        // add new columns without location info to last.
636        columns_at_last.iter().for_each(|request| {
637            if request.is_key {
638                // If a key column is added, we also need to store its index in primary_key_indices.
639                primary_key_indices.push(columns.len());
640            }
641            columns.push(request.column_schema.clone());
642        });
643
644        let mut builder = SchemaBuilder::try_from(columns)
645            .with_context(|_| error::SchemaBuildSnafu {
646                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
647            })?
648            // Also bump the schema version.
649            .version(table_schema.version() + 1);
650        for (k, v) in table_schema.metadata().iter() {
651            builder = builder.add_metadata(k, v);
652        }
653        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
654            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
655        })?;
656
657        let partition_key_indices = self
658            .partition_key_indices
659            .iter()
660            .map(|idx| table_schema.column_name_by_index(*idx))
661            // This unwrap is safe since we only add new columns.
662            .map(|name| new_schema.column_index_by_name(name).unwrap())
663            .collect();
664
665        // value_indices would be generated automatically.
666        let _ = meta_builder
667            .schema(Arc::new(new_schema))
668            .primary_key_indices(primary_key_indices)
669            .partition_key_indices(partition_key_indices);
670
671        Ok(meta_builder)
672    }
673
674    fn remove_columns(
675        &self,
676        table_name: &str,
677        column_names: &[String],
678    ) -> Result<TableMetaBuilder> {
679        let table_schema = &self.schema;
680        let column_names: HashSet<_> = column_names.iter().collect();
681        let mut meta_builder = self.new_meta_builder();
682
683        let timestamp_index = table_schema.timestamp_index();
684        // Check whether columns are existing and not in primary key index.
685        for column_name in &column_names {
686            if let Some(index) = table_schema.column_index_by_name(column_name) {
687                // This is a linear search, but since there won't be too much columns, the performance should
688                // be acceptable.
689                ensure!(
690                    !self.primary_key_indices.contains(&index),
691                    error::RemoveColumnInIndexSnafu {
692                        column_name: *column_name,
693                        table_name,
694                    }
695                );
696
697                ensure!(
698                    !self.partition_key_indices.contains(&index),
699                    error::RemovePartitionColumnSnafu {
700                        column_name: *column_name,
701                        table_name,
702                    }
703                );
704
705                if let Some(ts_index) = timestamp_index {
706                    // Not allowed to remove column in timestamp index.
707                    ensure!(
708                        index != ts_index,
709                        error::RemoveColumnInIndexSnafu {
710                            column_name: table_schema.column_name_by_index(ts_index),
711                            table_name,
712                        }
713                    );
714                }
715            } else {
716                return error::ColumnNotExistsSnafu {
717                    column_name: *column_name,
718                    table_name,
719                }
720                .fail()?;
721            }
722        }
723
724        // Collect columns after removal.
725        let columns: Vec<_> = table_schema
726            .column_schemas()
727            .iter()
728            .filter(|column_schema| !column_names.contains(&column_schema.name))
729            .cloned()
730            .collect();
731
732        let mut builder = SchemaBuilder::try_from_columns(columns)
733            .with_context(|_| error::SchemaBuildSnafu {
734                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
735            })?
736            // Also bump the schema version.
737            .version(table_schema.version() + 1);
738        for (k, v) in table_schema.metadata().iter() {
739            builder = builder.add_metadata(k, v);
740        }
741        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
742            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
743        })?;
744
745        // Rebuild the indices of primary key columns.
746        let primary_key_indices = self
747            .primary_key_indices
748            .iter()
749            .map(|idx| table_schema.column_name_by_index(*idx))
750            // This unwrap is safe since we don't allow removing a primary key column.
751            .map(|name| new_schema.column_index_by_name(name).unwrap())
752            .collect();
753
754        let partition_key_indices = self
755            .partition_key_indices
756            .iter()
757            .map(|idx| table_schema.column_name_by_index(*idx))
758            // This unwrap is safe since we don't allow removing a partition key column.
759            .map(|name| new_schema.column_index_by_name(name).unwrap())
760            .collect();
761
762        let _ = meta_builder
763            .schema(Arc::new(new_schema))
764            .primary_key_indices(primary_key_indices)
765            .partition_key_indices(partition_key_indices);
766
767        Ok(meta_builder)
768    }
769
770    fn modify_column_types(
771        &self,
772        table_name: &str,
773        requests: &[ModifyColumnTypeRequest],
774    ) -> Result<TableMetaBuilder> {
775        let table_schema = &self.schema;
776        let mut meta_builder = self.new_meta_builder();
777
778        let mut modify_column_types = HashMap::with_capacity(requests.len());
779        let timestamp_index = table_schema.timestamp_index();
780
781        for col_to_change in requests {
782            let change_column_name = &col_to_change.column_name;
783
784            let index = table_schema
785                .column_index_by_name(change_column_name)
786                .with_context(|| error::ColumnNotExistsSnafu {
787                    column_name: change_column_name,
788                    table_name,
789                })?;
790
791            let column = &table_schema.column_schemas()[index];
792
793            ensure!(
794                !self.primary_key_indices.contains(&index),
795                error::InvalidAlterRequestSnafu {
796                    table: table_name,
797                    err: format!(
798                        "Not allowed to change primary key index column '{}'",
799                        column.name
800                    )
801                }
802            );
803
804            if let Some(ts_index) = timestamp_index {
805                // Not allowed to change column datatype in timestamp index.
806                ensure!(
807                    index != ts_index,
808                    error::InvalidAlterRequestSnafu {
809                        table: table_name,
810                        err: format!(
811                            "Not allowed to change timestamp index column '{}' datatype",
812                            column.name
813                        )
814                    }
815                );
816            }
817
818            ensure!(
819                modify_column_types
820                    .insert(&col_to_change.column_name, col_to_change)
821                    .is_none(),
822                error::InvalidAlterRequestSnafu {
823                    table: table_name,
824                    err: format!(
825                        "change column datatype {} more than once",
826                        col_to_change.column_name
827                    ),
828                }
829            );
830
831            ensure!(
832                column
833                    .data_type
834                    .can_arrow_type_cast_to(&col_to_change.target_type),
835                error::InvalidAlterRequestSnafu {
836                    table: table_name,
837                    err: format!(
838                        "column '{}' cannot be cast automatically to type '{}'",
839                        col_to_change.column_name, col_to_change.target_type,
840                    ),
841                }
842            );
843
844            ensure!(
845                column.is_nullable(),
846                error::InvalidAlterRequestSnafu {
847                    table: table_name,
848                    err: format!(
849                        "column '{}' must be nullable to ensure safe conversion.",
850                        col_to_change.column_name,
851                    ),
852                }
853            );
854        }
855        // Collect columns after changed.
856
857        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
858        for mut column in table_schema.column_schemas().iter().cloned() {
859            if let Some(change_column) = modify_column_types.get(&column.name) {
860                column.data_type = change_column.target_type.clone();
861                let new_default = if let Some(default_value) = column.default_constraint() {
862                    Some(
863                        default_value
864                            .cast_to_datatype(&change_column.target_type)
865                            .with_context(|_| error::CastDefaultValueSnafu {
866                                reason: format!(
867                                    "Failed to cast default value from {:?} to type {:?}",
868                                    default_value, &change_column.target_type
869                                ),
870                            })?,
871                    )
872                } else {
873                    None
874                };
875                column = column
876                    .clone()
877                    .with_default_constraint(new_default.clone())
878                    .with_context(|_| error::CastDefaultValueSnafu {
879                        reason: format!("Failed to set new default: {:?}", new_default),
880                    })?;
881            }
882            columns.push(column)
883        }
884
885        let mut builder = SchemaBuilder::try_from_columns(columns)
886            .with_context(|_| error::SchemaBuildSnafu {
887                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
888            })?
889            // Also bump the schema version.
890            .version(table_schema.version() + 1);
891        for (k, v) in table_schema.metadata().iter() {
892            builder = builder.add_metadata(k, v);
893        }
894        let new_schema = builder.build().with_context(|_| {
895            let column_names: Vec<_> = requests
896                .iter()
897                .map(|request| &request.column_name)
898                .collect();
899
900            error::SchemaBuildSnafu {
901                msg: format!(
902                    "Table {table_name} cannot change datatype with columns {column_names:?}"
903                ),
904            }
905        })?;
906
907        let _ = meta_builder
908            .schema(Arc::new(new_schema))
909            .primary_key_indices(self.primary_key_indices.clone());
910
911        Ok(meta_builder)
912    }
913
914    /// Split requests into different groups using column location info.
915    fn split_requests_by_column_location<'a>(
916        &self,
917        table_name: &str,
918        requests: &'a [AddColumnRequest],
919    ) -> Result<SplitResult<'a>> {
920        let table_schema = &self.schema;
921        let mut columns_at_first = Vec::new();
922        let mut columns_at_after = HashMap::new();
923        let mut columns_at_last = Vec::new();
924        let mut column_names = Vec::with_capacity(requests.len());
925        for request in requests {
926            // Check whether columns to add are already existing.
927            let column_name = &request.column_schema.name;
928            column_names.push(column_name.clone());
929            ensure!(
930                table_schema.column_schema_by_name(column_name).is_none(),
931                error::ColumnExistsSnafu {
932                    column_name,
933                    table_name,
934                }
935            );
936            match request.location.as_ref() {
937                Some(AddColumnLocation::First) => {
938                    columns_at_first.push(request);
939                }
940                Some(AddColumnLocation::After { column_name }) => {
941                    ensure!(
942                        table_schema.column_schema_by_name(column_name).is_some(),
943                        error::ColumnNotExistsSnafu {
944                            column_name,
945                            table_name,
946                        }
947                    );
948                    columns_at_after
949                        .entry(column_name.clone())
950                        .or_insert(Vec::new())
951                        .push(request);
952                }
953                None => {
954                    columns_at_last.push(request);
955                }
956            }
957        }
958        Ok(SplitResult {
959            columns_at_first,
960            columns_at_after,
961            columns_at_last,
962            column_names,
963        })
964    }
965
966    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
967        let table_schema = &self.schema;
968        let mut meta_builder = self.new_meta_builder();
969        let mut columns = Vec::with_capacity(table_schema.num_columns());
970        for column_schema in table_schema.column_schemas() {
971            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
972                // Drop default constraint.
973                ensure!(
974                    column_schema.default_constraint().is_some(),
975                    error::InvalidAlterRequestSnafu {
976                        table: table_name,
977                        err: format!("column {name} does not have a default value"),
978                    }
979                );
980                if !column_schema.is_nullable() {
981                    return error::InvalidAlterRequestSnafu {
982                        table: table_name,
983                        err: format!(
984                            "column {name} is not nullable and `default` cannot be dropped",
985                        ),
986                    }
987                    .fail();
988                }
989                let new_column_schema = column_schema.clone();
990                let new_column_schema = new_column_schema
991                    .with_default_constraint(None)
992                    .with_context(|_| error::SchemaBuildSnafu {
993                        msg: format!("Table {table_name} cannot drop default values"),
994                    })?;
995                columns.push(new_column_schema);
996            } else {
997                columns.push(column_schema.clone());
998            }
999        }
1000
1001        let mut builder = SchemaBuilder::try_from_columns(columns)
1002            .with_context(|_| error::SchemaBuildSnafu {
1003                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1004            })?
1005            // Also bump the schema version.
1006            .version(table_schema.version() + 1);
1007        for (k, v) in table_schema.metadata().iter() {
1008            builder = builder.add_metadata(k, v);
1009        }
1010        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1011            msg: format!("Table {table_name} cannot drop default values"),
1012        })?;
1013
1014        let _ = meta_builder.schema(Arc::new(new_schema));
1015
1016        Ok(meta_builder)
1017    }
1018
1019    fn set_defaults(
1020        &self,
1021        table_name: &str,
1022        set_defaults: &[SetDefaultRequest],
1023    ) -> Result<TableMetaBuilder> {
1024        let table_schema = &self.schema;
1025        let mut meta_builder = self.new_meta_builder();
1026        let mut columns = Vec::with_capacity(table_schema.num_columns());
1027        for column_schema in table_schema.column_schemas() {
1028            if let Some(set_default) = set_defaults
1029                .iter()
1030                .find(|s| s.column_name == column_schema.name)
1031            {
1032                let new_column_schema = column_schema.clone();
1033                let new_column_schema = new_column_schema
1034                    .with_default_constraint(set_default.default_constraint.clone())
1035                    .with_context(|_| error::SchemaBuildSnafu {
1036                        msg: format!("Table {table_name} cannot set default values"),
1037                    })?;
1038                columns.push(new_column_schema);
1039            } else {
1040                columns.push(column_schema.clone());
1041            }
1042        }
1043
1044        let mut builder = SchemaBuilder::try_from_columns(columns)
1045            .with_context(|_| error::SchemaBuildSnafu {
1046                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1047            })?
1048            // Also bump the schema version.
1049            .version(table_schema.version() + 1);
1050        for (k, v) in table_schema.metadata().iter() {
1051            builder = builder.add_metadata(k, v);
1052        }
1053        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1054            msg: format!("Table {table_name} cannot set default values"),
1055        })?;
1056
1057        let _ = meta_builder.schema(Arc::new(new_schema));
1058
1059        Ok(meta_builder)
1060    }
1061}
1062
1063#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1064#[builder(pattern = "owned")]
1065pub struct TableInfo {
1066    /// Id and version of the table.
1067    #[builder(default, setter(into))]
1068    pub ident: TableIdent,
1069    /// Name of the table.
1070    #[builder(setter(into))]
1071    pub name: String,
1072    /// Comment of the table.
1073    #[builder(default, setter(into))]
1074    pub desc: Option<String>,
1075    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1076    pub catalog_name: String,
1077    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1078    pub schema_name: String,
1079    pub meta: TableMeta,
1080    #[builder(default = "TableType::Base")]
1081    pub table_type: TableType,
1082}
1083
1084pub type TableInfoRef = Arc<TableInfo>;
1085
1086impl TableInfo {
1087    pub fn table_id(&self) -> TableId {
1088        self.ident.table_id
1089    }
1090
1091    pub fn region_ids(&self) -> Vec<RegionId> {
1092        self.meta
1093            .region_numbers
1094            .iter()
1095            .map(|id| RegionId::new(self.table_id(), *id))
1096            .collect()
1097    }
1098    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1099    pub fn full_table_name(&self) -> String {
1100        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1101    }
1102
1103    pub fn get_db_string(&self) -> String {
1104        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1105    }
1106
1107    /// Returns true when the table is the metric engine's physical table.
1108    pub fn is_physical_table(&self) -> bool {
1109        self.meta
1110            .options
1111            .extra_options
1112            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1113    }
1114
1115    /// Return true if the table's TTL is `instant`.
1116    pub fn is_ttl_instant_table(&self) -> bool {
1117        self.meta
1118            .options
1119            .ttl
1120            .map(|t| t.is_instant())
1121            .unwrap_or(false)
1122    }
1123}
1124
1125impl TableInfoBuilder {
1126    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1127        Self {
1128            name: Some(name.into()),
1129            meta: Some(meta),
1130            ..Default::default()
1131        }
1132    }
1133
1134    pub fn table_id(mut self, id: TableId) -> Self {
1135        let ident = self.ident.get_or_insert_with(TableIdent::default);
1136        ident.table_id = id;
1137        self
1138    }
1139
1140    pub fn table_version(mut self, version: TableVersion) -> Self {
1141        let ident = self.ident.get_or_insert_with(TableIdent::default);
1142        ident.version = version;
1143        self
1144    }
1145}
1146
1147impl TableIdent {
1148    pub fn new(table_id: TableId) -> Self {
1149        Self {
1150            table_id,
1151            version: 0,
1152        }
1153    }
1154}
1155
1156impl From<TableId> for TableIdent {
1157    fn from(table_id: TableId) -> Self {
1158        Self::new(table_id)
1159    }
1160}
1161
1162/// Struct used to serialize and deserialize [`TableMeta`].
1163#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1164pub struct RawTableMeta {
1165    pub schema: RawSchema,
1166    /// The indices of columns in primary key. Note that the index of timestamp column
1167    /// is not included. Order matters to this array.
1168    pub primary_key_indices: Vec<usize>,
1169    ///  The indices of columns in value. The index of timestamp column is included.
1170    /// Order doesn't matter to this array.
1171    pub value_indices: Vec<usize>,
1172    /// Engine type of this table. Usually in small case.
1173    pub engine: String,
1174    /// Next column id of a new column.
1175    /// It's used to ensure all columns with the same name across all regions have the same column id.
1176    pub next_column_id: ColumnId,
1177    pub region_numbers: Vec<u32>,
1178    pub options: TableOptions,
1179    pub created_on: DateTime<Utc>,
1180    pub updated_on: DateTime<Utc>,
1181    /// Order doesn't matter to this array.
1182    #[serde(default)]
1183    pub partition_key_indices: Vec<usize>,
1184    /// Map of column name to column id.
1185    /// Note: This field may be empty for older versions that did not include this field.
1186    #[serde(default)]
1187    pub column_ids: Vec<ColumnId>,
1188}
1189
1190impl<'de> Deserialize<'de> for RawTableMeta {
1191    fn deserialize<D>(
1192        deserializer: D,
1193    ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1194    where
1195        D: Deserializer<'de>,
1196    {
1197        #[derive(Deserialize)]
1198        struct Helper {
1199            schema: RawSchema,
1200            primary_key_indices: Vec<usize>,
1201            value_indices: Vec<usize>,
1202            engine: String,
1203            next_column_id: u32,
1204            region_numbers: Vec<u32>,
1205            options: TableOptions,
1206            created_on: DateTime<Utc>,
1207            updated_on: Option<DateTime<Utc>>,
1208            #[serde(default)]
1209            partition_key_indices: Vec<usize>,
1210            #[serde(default)]
1211            column_ids: Vec<ColumnId>,
1212        }
1213
1214        let h = Helper::deserialize(deserializer)?;
1215        Ok(RawTableMeta {
1216            schema: h.schema,
1217            primary_key_indices: h.primary_key_indices,
1218            value_indices: h.value_indices,
1219            engine: h.engine,
1220            next_column_id: h.next_column_id,
1221            region_numbers: h.region_numbers,
1222            options: h.options,
1223            created_on: h.created_on,
1224            updated_on: h.updated_on.unwrap_or(h.created_on),
1225            partition_key_indices: h.partition_key_indices,
1226            column_ids: h.column_ids,
1227        })
1228    }
1229}
1230
1231impl From<TableMeta> for RawTableMeta {
1232    fn from(meta: TableMeta) -> RawTableMeta {
1233        RawTableMeta {
1234            schema: RawSchema::from(&*meta.schema),
1235            primary_key_indices: meta.primary_key_indices,
1236            value_indices: meta.value_indices,
1237            engine: meta.engine,
1238            next_column_id: meta.next_column_id,
1239            region_numbers: meta.region_numbers,
1240            options: meta.options,
1241            created_on: meta.created_on,
1242            updated_on: meta.updated_on,
1243            partition_key_indices: meta.partition_key_indices,
1244            column_ids: meta.column_ids,
1245        }
1246    }
1247}
1248
1249impl TryFrom<RawTableMeta> for TableMeta {
1250    type Error = ConvertError;
1251
1252    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1253        Ok(TableMeta {
1254            schema: Arc::new(Schema::try_from(raw.schema)?),
1255            primary_key_indices: raw.primary_key_indices,
1256            value_indices: raw.value_indices,
1257            engine: raw.engine,
1258            region_numbers: raw.region_numbers,
1259            next_column_id: raw.next_column_id,
1260            options: raw.options,
1261            created_on: raw.created_on,
1262            updated_on: raw.updated_on,
1263            partition_key_indices: raw.partition_key_indices,
1264            column_ids: raw.column_ids,
1265        })
1266    }
1267}
1268
1269/// Struct used to serialize and deserialize [`TableInfo`].
1270#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1271pub struct RawTableInfo {
1272    pub ident: TableIdent,
1273    pub name: String,
1274    pub desc: Option<String>,
1275    pub catalog_name: String,
1276    pub schema_name: String,
1277    pub meta: RawTableMeta,
1278    pub table_type: TableType,
1279}
1280
1281impl RawTableInfo {
1282    /// Returns the map of column name to column id.
1283    ///
1284    /// Note: This method may return an empty map for older versions that did not include this field.
1285    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1286        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1287            None
1288        } else {
1289            Some(
1290                self.meta
1291                    .column_ids
1292                    .iter()
1293                    .enumerate()
1294                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1295                    .collect(),
1296            )
1297        }
1298    }
1299
1300    /// Sort the columns in [RawTableInfo], logical tables require it.
1301    pub fn sort_columns(&mut self) {
1302        let column_schemas = &self.meta.schema.column_schemas;
1303        let primary_keys = self
1304            .meta
1305            .primary_key_indices
1306            .iter()
1307            .map(|index| column_schemas[*index].name.clone())
1308            .collect::<HashSet<_>>();
1309
1310        let name_to_ids = self.name_to_ids().unwrap_or_default();
1311        self.meta
1312            .schema
1313            .column_schemas
1314            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1315
1316        // Compute new indices of sorted columns
1317        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1318        let mut timestamp_index = None;
1319        let mut value_indices =
1320            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1321        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1322        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1323            if primary_keys.contains(&column_schema.name) {
1324                primary_key_indices.push(index);
1325            } else if column_schema.is_time_index() {
1326                value_indices.push(index);
1327                timestamp_index = Some(index);
1328            } else {
1329                value_indices.push(index);
1330            }
1331            if let Some(id) = name_to_ids.get(&column_schema.name) {
1332                column_ids.push(*id);
1333            }
1334        }
1335
1336        // Overwrite table meta
1337        self.meta.schema.timestamp_index = timestamp_index;
1338        self.meta.primary_key_indices = primary_key_indices;
1339        self.meta.value_indices = value_indices;
1340        self.meta.column_ids = column_ids;
1341    }
1342
1343    /// Extracts region options from table info.
1344    ///
1345    /// All "region options" are actually a copy of table options for redundancy.
1346    pub fn to_region_options(&self) -> HashMap<String, String> {
1347        HashMap::from(&self.meta.options)
1348    }
1349
1350    /// Returns the table reference.
1351    pub fn table_ref(&self) -> TableReference<'_> {
1352        TableReference::full(
1353            self.catalog_name.as_str(),
1354            self.schema_name.as_str(),
1355            self.name.as_str(),
1356        )
1357    }
1358}
1359
1360impl From<TableInfo> for RawTableInfo {
1361    fn from(info: TableInfo) -> RawTableInfo {
1362        RawTableInfo {
1363            ident: info.ident,
1364            name: info.name,
1365            desc: info.desc,
1366            catalog_name: info.catalog_name,
1367            schema_name: info.schema_name,
1368            meta: RawTableMeta::from(info.meta),
1369            table_type: info.table_type,
1370        }
1371    }
1372}
1373
1374impl TryFrom<RawTableInfo> for TableInfo {
1375    type Error = ConvertError;
1376
1377    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1378        Ok(TableInfo {
1379            ident: raw.ident,
1380            name: raw.name,
1381            desc: raw.desc,
1382            catalog_name: raw.catalog_name,
1383            schema_name: raw.schema_name,
1384            meta: TableMeta::try_from(raw.meta)?,
1385            table_type: raw.table_type,
1386        })
1387    }
1388}
1389
1390/// Set column fulltext options if it passed the validation.
1391///
1392/// Options allowed to modify:
1393/// * backend
1394///
1395/// Options not allowed to modify:
1396/// * analyzer
1397/// * case_sensitive
1398fn set_column_fulltext_options(
1399    column_schema: &mut ColumnSchema,
1400    column_name: &str,
1401    options: &FulltextOptions,
1402    current_options: Option<FulltextOptions>,
1403) -> Result<()> {
1404    if let Some(current_options) = current_options {
1405        ensure!(
1406            current_options.analyzer == options.analyzer
1407                && current_options.case_sensitive == options.case_sensitive,
1408            error::InvalidColumnOptionSnafu {
1409                column_name,
1410                msg: format!(
1411                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1412                    current_options.analyzer, current_options.case_sensitive
1413                ),
1414            }
1415        );
1416    }
1417
1418    column_schema
1419        .set_fulltext_options(options)
1420        .context(error::SetFulltextOptionsSnafu { column_name })?;
1421
1422    Ok(())
1423}
1424
1425fn unset_column_fulltext_options(
1426    column_schema: &mut ColumnSchema,
1427    column_name: &str,
1428    current_options: Option<FulltextOptions>,
1429) -> Result<()> {
1430    ensure!(
1431        current_options
1432            .as_ref()
1433            .is_some_and(|options| options.enable),
1434        error::InvalidColumnOptionSnafu {
1435            column_name,
1436            msg: "FULLTEXT index already disabled".to_string(),
1437        }
1438    );
1439
1440    let mut options = current_options.unwrap();
1441    options.enable = false;
1442    column_schema
1443        .set_fulltext_options(&options)
1444        .context(error::SetFulltextOptionsSnafu { column_name })?;
1445
1446    Ok(())
1447}
1448
1449fn set_column_skipping_index_options(
1450    column_schema: &mut ColumnSchema,
1451    column_name: &str,
1452    options: &SkippingIndexOptions,
1453) -> Result<()> {
1454    column_schema
1455        .set_skipping_options(options)
1456        .context(error::SetSkippingOptionsSnafu { column_name })?;
1457
1458    Ok(())
1459}
1460
1461fn unset_column_skipping_index_options(
1462    column_schema: &mut ColumnSchema,
1463    column_name: &str,
1464) -> Result<()> {
1465    column_schema
1466        .unset_skipping_options()
1467        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1468    Ok(())
1469}
1470
1471#[cfg(test)]
1472mod tests {
1473    use std::assert_matches::assert_matches;
1474
1475    use common_error::ext::ErrorExt;
1476    use common_error::status_code::StatusCode;
1477    use datatypes::data_type::ConcreteDataType;
1478    use datatypes::schema::{
1479        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1480    };
1481
1482    use super::*;
1483    use crate::Error;
1484
1485    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1486    fn new_test_schema() -> Schema {
1487        let column_schemas = vec![
1488            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1489            ColumnSchema::new(
1490                "ts",
1491                ConcreteDataType::timestamp_millisecond_datatype(),
1492                false,
1493            )
1494            .with_time_index(true),
1495            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1496        ];
1497        SchemaBuilder::try_from(column_schemas)
1498            .unwrap()
1499            .version(123)
1500            .build()
1501            .unwrap()
1502    }
1503
1504    #[test]
1505    fn test_raw_convert() {
1506        let schema = Arc::new(new_test_schema());
1507        let meta = TableMetaBuilder::empty()
1508            .schema(schema)
1509            .primary_key_indices(vec![0])
1510            .engine("engine")
1511            .next_column_id(3)
1512            .build()
1513            .unwrap();
1514        let info = TableInfoBuilder::default()
1515            .table_id(10)
1516            .table_version(5)
1517            .name("mytable")
1518            .meta(meta)
1519            .build()
1520            .unwrap();
1521
1522        let raw = RawTableInfo::from(info.clone());
1523        let info_new = TableInfo::try_from(raw).unwrap();
1524
1525        assert_eq!(info, info_new);
1526    }
1527
1528    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1529        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1530        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1531        let alter_kind = AlterKind::AddColumns {
1532            columns: vec![
1533                AddColumnRequest {
1534                    column_schema: new_tag,
1535                    is_key: true,
1536                    location: None,
1537                    add_if_not_exists: false,
1538                },
1539                AddColumnRequest {
1540                    column_schema: new_field,
1541                    is_key: false,
1542                    location: None,
1543                    add_if_not_exists: false,
1544                },
1545            ],
1546        };
1547
1548        let builder = meta
1549            .builder_with_alter_kind("my_table", &alter_kind)
1550            .unwrap();
1551        builder.build().unwrap()
1552    }
1553
1554    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1555        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1556        let new_field = ColumnSchema::new(
1557            "my_field_after_ts",
1558            ConcreteDataType::string_datatype(),
1559            true,
1560        );
1561        let yet_another_field = ColumnSchema::new(
1562            "yet_another_field_after_ts",
1563            ConcreteDataType::int64_datatype(),
1564            true,
1565        );
1566        let alter_kind = AlterKind::AddColumns {
1567            columns: vec![
1568                AddColumnRequest {
1569                    column_schema: new_tag,
1570                    is_key: true,
1571                    location: Some(AddColumnLocation::First),
1572                    add_if_not_exists: false,
1573                },
1574                AddColumnRequest {
1575                    column_schema: new_field,
1576                    is_key: false,
1577                    location: Some(AddColumnLocation::After {
1578                        column_name: "ts".to_string(),
1579                    }),
1580                    add_if_not_exists: false,
1581                },
1582                AddColumnRequest {
1583                    column_schema: yet_another_field,
1584                    is_key: true,
1585                    location: Some(AddColumnLocation::After {
1586                        column_name: "ts".to_string(),
1587                    }),
1588                    add_if_not_exists: false,
1589                },
1590            ],
1591        };
1592
1593        let builder = meta
1594            .builder_with_alter_kind("my_table", &alter_kind)
1595            .unwrap();
1596        builder.build().unwrap()
1597    }
1598
1599    #[test]
1600    fn test_add_columns() {
1601        let schema = Arc::new(new_test_schema());
1602        let meta = TableMetaBuilder::empty()
1603            .schema(schema)
1604            .primary_key_indices(vec![0])
1605            .engine("engine")
1606            .next_column_id(3)
1607            .build()
1608            .unwrap();
1609
1610        let new_meta = add_columns_to_meta(&meta);
1611        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1612
1613        let names: Vec<String> = new_meta
1614            .schema
1615            .column_schemas()
1616            .iter()
1617            .map(|column_schema| column_schema.name.clone())
1618            .collect();
1619        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1620        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1621        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1622    }
1623
1624    #[test]
1625    fn test_add_columns_multiple_times() {
1626        let schema = Arc::new(new_test_schema());
1627        let meta = TableMetaBuilder::empty()
1628            .schema(schema)
1629            .primary_key_indices(vec![0])
1630            .engine("engine")
1631            .next_column_id(3)
1632            .build()
1633            .unwrap();
1634
1635        let alter_kind = AlterKind::AddColumns {
1636            columns: vec![
1637                AddColumnRequest {
1638                    column_schema: ColumnSchema::new(
1639                        "col3",
1640                        ConcreteDataType::int32_datatype(),
1641                        true,
1642                    ),
1643                    is_key: true,
1644                    location: None,
1645                    add_if_not_exists: true,
1646                },
1647                AddColumnRequest {
1648                    column_schema: ColumnSchema::new(
1649                        "col3",
1650                        ConcreteDataType::int32_datatype(),
1651                        true,
1652                    ),
1653                    is_key: true,
1654                    location: None,
1655                    add_if_not_exists: true,
1656                },
1657            ],
1658        };
1659        let err = meta
1660            .builder_with_alter_kind("my_table", &alter_kind)
1661            .err()
1662            .unwrap();
1663        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1664    }
1665
1666    #[test]
1667    fn test_remove_columns() {
1668        let schema = Arc::new(new_test_schema());
1669        let meta = TableMetaBuilder::empty()
1670            .schema(schema.clone())
1671            .primary_key_indices(vec![0])
1672            .engine("engine")
1673            .next_column_id(3)
1674            .build()
1675            .unwrap();
1676        // Add more columns so we have enough candidate columns to remove.
1677        let meta = add_columns_to_meta(&meta);
1678
1679        let alter_kind = AlterKind::DropColumns {
1680            names: vec![String::from("col2"), String::from("my_field")],
1681        };
1682        let new_meta = meta
1683            .builder_with_alter_kind("my_table", &alter_kind)
1684            .unwrap()
1685            .build()
1686            .unwrap();
1687
1688        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1689
1690        let names: Vec<String> = new_meta
1691            .schema
1692            .column_schemas()
1693            .iter()
1694            .map(|column_schema| column_schema.name.clone())
1695            .collect();
1696        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1697        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1698        assert_eq!(&[1], &new_meta.value_indices[..]);
1699        assert_eq!(
1700            schema.timestamp_column(),
1701            new_meta.schema.timestamp_column()
1702        );
1703    }
1704
1705    #[test]
1706    fn test_remove_multiple_columns_before_timestamp() {
1707        let column_schemas = vec![
1708            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1709            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1710            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1711            ColumnSchema::new(
1712                "ts",
1713                ConcreteDataType::timestamp_millisecond_datatype(),
1714                false,
1715            )
1716            .with_time_index(true),
1717        ];
1718        let schema = Arc::new(
1719            SchemaBuilder::try_from(column_schemas)
1720                .unwrap()
1721                .version(123)
1722                .build()
1723                .unwrap(),
1724        );
1725        let meta = TableMetaBuilder::empty()
1726            .schema(schema.clone())
1727            .primary_key_indices(vec![1])
1728            .engine("engine")
1729            .next_column_id(4)
1730            .build()
1731            .unwrap();
1732
1733        // Remove columns in reverse order to test whether timestamp index is valid.
1734        let alter_kind = AlterKind::DropColumns {
1735            names: vec![String::from("col3"), String::from("col1")],
1736        };
1737        let new_meta = meta
1738            .builder_with_alter_kind("my_table", &alter_kind)
1739            .unwrap()
1740            .build()
1741            .unwrap();
1742
1743        let names: Vec<String> = new_meta
1744            .schema
1745            .column_schemas()
1746            .iter()
1747            .map(|column_schema| column_schema.name.clone())
1748            .collect();
1749        assert_eq!(&["col2", "ts"], &names[..]);
1750        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1751        assert_eq!(&[1], &new_meta.value_indices[..]);
1752        assert_eq!(
1753            schema.timestamp_column(),
1754            new_meta.schema.timestamp_column()
1755        );
1756    }
1757
1758    #[test]
1759    fn test_add_existing_column() {
1760        let schema = Arc::new(new_test_schema());
1761        let meta = TableMetaBuilder::empty()
1762            .schema(schema)
1763            .primary_key_indices(vec![0])
1764            .engine("engine")
1765            .next_column_id(3)
1766            .build()
1767            .unwrap();
1768
1769        let alter_kind = AlterKind::AddColumns {
1770            columns: vec![AddColumnRequest {
1771                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1772                is_key: false,
1773                location: None,
1774                add_if_not_exists: false,
1775            }],
1776        };
1777
1778        let err = meta
1779            .builder_with_alter_kind("my_table", &alter_kind)
1780            .err()
1781            .unwrap();
1782        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1783
1784        // Add if not exists
1785        let alter_kind = AlterKind::AddColumns {
1786            columns: vec![AddColumnRequest {
1787                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1788                is_key: true,
1789                location: None,
1790                add_if_not_exists: true,
1791            }],
1792        };
1793        let new_meta = meta
1794            .builder_with_alter_kind("my_table", &alter_kind)
1795            .unwrap()
1796            .build()
1797            .unwrap();
1798        assert_eq!(
1799            meta.schema.column_schemas(),
1800            new_meta.schema.column_schemas()
1801        );
1802        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1803    }
1804
1805    #[test]
1806    fn test_add_different_type_column() {
1807        let schema = Arc::new(new_test_schema());
1808        let meta = TableMetaBuilder::empty()
1809            .schema(schema)
1810            .primary_key_indices(vec![0])
1811            .engine("engine")
1812            .next_column_id(3)
1813            .build()
1814            .unwrap();
1815
1816        // Add if not exists, but different type.
1817        let alter_kind = AlterKind::AddColumns {
1818            columns: vec![AddColumnRequest {
1819                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1820                is_key: false,
1821                location: None,
1822                add_if_not_exists: true,
1823            }],
1824        };
1825        let err = meta
1826            .builder_with_alter_kind("my_table", &alter_kind)
1827            .err()
1828            .unwrap();
1829        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1830    }
1831
1832    #[test]
1833    fn test_add_invalid_column() {
1834        let schema = Arc::new(new_test_schema());
1835        let meta = TableMetaBuilder::empty()
1836            .schema(schema)
1837            .primary_key_indices(vec![0])
1838            .engine("engine")
1839            .next_column_id(3)
1840            .build()
1841            .unwrap();
1842
1843        // Not nullable and no default value.
1844        let alter_kind = AlterKind::AddColumns {
1845            columns: vec![AddColumnRequest {
1846                column_schema: ColumnSchema::new(
1847                    "weny",
1848                    ConcreteDataType::string_datatype(),
1849                    false,
1850                ),
1851                is_key: false,
1852                location: None,
1853                add_if_not_exists: false,
1854            }],
1855        };
1856
1857        let err = meta
1858            .builder_with_alter_kind("my_table", &alter_kind)
1859            .err()
1860            .unwrap();
1861        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1862    }
1863
1864    #[test]
1865    fn test_remove_unknown_column() {
1866        let schema = Arc::new(new_test_schema());
1867        let meta = TableMetaBuilder::empty()
1868            .schema(schema)
1869            .primary_key_indices(vec![0])
1870            .engine("engine")
1871            .next_column_id(3)
1872            .build()
1873            .unwrap();
1874
1875        let alter_kind = AlterKind::DropColumns {
1876            names: vec![String::from("unknown")],
1877        };
1878
1879        let err = meta
1880            .builder_with_alter_kind("my_table", &alter_kind)
1881            .err()
1882            .unwrap();
1883        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1884    }
1885
1886    #[test]
1887    fn test_change_unknown_column_data_type() {
1888        let schema = Arc::new(new_test_schema());
1889        let meta = TableMetaBuilder::empty()
1890            .schema(schema)
1891            .primary_key_indices(vec![0])
1892            .engine("engine")
1893            .next_column_id(3)
1894            .build()
1895            .unwrap();
1896
1897        let alter_kind = AlterKind::ModifyColumnTypes {
1898            columns: vec![ModifyColumnTypeRequest {
1899                column_name: "unknown".to_string(),
1900                target_type: ConcreteDataType::string_datatype(),
1901            }],
1902        };
1903
1904        let err = meta
1905            .builder_with_alter_kind("my_table", &alter_kind)
1906            .err()
1907            .unwrap();
1908        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1909    }
1910
1911    #[test]
1912    fn test_remove_key_column() {
1913        let schema = Arc::new(new_test_schema());
1914        let meta = TableMetaBuilder::empty()
1915            .schema(schema)
1916            .primary_key_indices(vec![0])
1917            .engine("engine")
1918            .next_column_id(3)
1919            .build()
1920            .unwrap();
1921
1922        // Remove column in primary key.
1923        let alter_kind = AlterKind::DropColumns {
1924            names: vec![String::from("col1")],
1925        };
1926
1927        let err = meta
1928            .builder_with_alter_kind("my_table", &alter_kind)
1929            .err()
1930            .unwrap();
1931        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1932
1933        // Remove timestamp column.
1934        let alter_kind = AlterKind::DropColumns {
1935            names: vec![String::from("ts")],
1936        };
1937
1938        let err = meta
1939            .builder_with_alter_kind("my_table", &alter_kind)
1940            .err()
1941            .unwrap();
1942        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1943    }
1944
1945    #[test]
1946    fn test_remove_partition_column() {
1947        let schema = Arc::new(new_test_schema());
1948        let meta = TableMetaBuilder::empty()
1949            .schema(schema)
1950            .primary_key_indices(vec![])
1951            .partition_key_indices(vec![0])
1952            .engine("engine")
1953            .next_column_id(3)
1954            .build()
1955            .unwrap();
1956        // Remove column in primary key.
1957        let alter_kind = AlterKind::DropColumns {
1958            names: vec![String::from("col1")],
1959        };
1960
1961        let err = meta
1962            .builder_with_alter_kind("my_table", &alter_kind)
1963            .err()
1964            .unwrap();
1965        assert_matches!(err, Error::RemovePartitionColumn { .. });
1966    }
1967
1968    #[test]
1969    fn test_change_key_column_data_type() {
1970        let schema = Arc::new(new_test_schema());
1971        let meta = TableMetaBuilder::empty()
1972            .schema(schema)
1973            .primary_key_indices(vec![0])
1974            .engine("engine")
1975            .next_column_id(3)
1976            .build()
1977            .unwrap();
1978
1979        // Remove column in primary key.
1980        let alter_kind = AlterKind::ModifyColumnTypes {
1981            columns: vec![ModifyColumnTypeRequest {
1982                column_name: "col1".to_string(),
1983                target_type: ConcreteDataType::string_datatype(),
1984            }],
1985        };
1986
1987        let err = meta
1988            .builder_with_alter_kind("my_table", &alter_kind)
1989            .err()
1990            .unwrap();
1991        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1992
1993        // Remove timestamp column.
1994        let alter_kind = AlterKind::ModifyColumnTypes {
1995            columns: vec![ModifyColumnTypeRequest {
1996                column_name: "ts".to_string(),
1997                target_type: ConcreteDataType::string_datatype(),
1998            }],
1999        };
2000
2001        let err = meta
2002            .builder_with_alter_kind("my_table", &alter_kind)
2003            .err()
2004            .unwrap();
2005        assert_eq!(StatusCode::InvalidArguments, err.status_code());
2006    }
2007
2008    #[test]
2009    fn test_alloc_new_column() {
2010        let schema = Arc::new(new_test_schema());
2011        let mut meta = TableMetaBuilder::empty()
2012            .schema(schema)
2013            .primary_key_indices(vec![0])
2014            .engine("engine")
2015            .next_column_id(3)
2016            .build()
2017            .unwrap();
2018        assert_eq!(3, meta.next_column_id);
2019
2020        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2021        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2022
2023        assert_eq!(4, meta.next_column_id);
2024        assert_eq!(column_schema.name, desc.name);
2025    }
2026
2027    #[test]
2028    fn test_add_columns_with_location() {
2029        let schema = Arc::new(new_test_schema());
2030        let meta = TableMetaBuilder::empty()
2031            .schema(schema)
2032            .primary_key_indices(vec![0])
2033            // partition col: col1, col2
2034            .partition_key_indices(vec![0, 2])
2035            .engine("engine")
2036            .next_column_id(3)
2037            .build()
2038            .unwrap();
2039
2040        let new_meta = add_columns_to_meta_with_location(&meta);
2041        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2042
2043        let names: Vec<String> = new_meta
2044            .schema
2045            .column_schemas()
2046            .iter()
2047            .map(|column_schema| column_schema.name.clone())
2048            .collect();
2049        assert_eq!(
2050            &[
2051                "my_tag_first",               // primary key column
2052                "col1",                       // partition column
2053                "ts",                         // timestamp column
2054                "yet_another_field_after_ts", // primary key column
2055                "my_field_after_ts",          // value column
2056                "col2",                       // partition column
2057            ],
2058            &names[..]
2059        );
2060        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2061        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2062        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2063    }
2064
2065    #[test]
2066    fn test_modify_column_fulltext_options() {
2067        let schema = Arc::new(new_test_schema());
2068        let meta = TableMetaBuilder::empty()
2069            .schema(schema)
2070            .primary_key_indices(vec![0])
2071            .engine("engine")
2072            .next_column_id(3)
2073            .build()
2074            .unwrap();
2075
2076        let alter_kind = AlterKind::SetIndexes {
2077            options: vec![SetIndexOption::Fulltext {
2078                column_name: "col1".to_string(),
2079                options: FulltextOptions::default(),
2080            }],
2081        };
2082        let err = meta
2083            .builder_with_alter_kind("my_table", &alter_kind)
2084            .err()
2085            .unwrap();
2086        assert_eq!(
2087            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2088            err.to_string()
2089        );
2090
2091        // Add a string column and make it fulltext indexed
2092        let new_meta = add_columns_to_meta_with_location(&meta);
2093        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2094
2095        let alter_kind = AlterKind::SetIndexes {
2096            options: vec![SetIndexOption::Fulltext {
2097                column_name: "my_tag_first".to_string(),
2098                options: FulltextOptions::new_unchecked(
2099                    true,
2100                    FulltextAnalyzer::Chinese,
2101                    true,
2102                    FulltextBackend::Bloom,
2103                    1000,
2104                    0.01,
2105                ),
2106            }],
2107        };
2108        let new_meta = new_meta
2109            .builder_with_alter_kind("my_table", &alter_kind)
2110            .unwrap()
2111            .build()
2112            .unwrap();
2113        let column_schema = new_meta
2114            .schema
2115            .column_schema_by_name("my_tag_first")
2116            .unwrap();
2117        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2118        assert!(fulltext_options.enable);
2119        assert_eq!(
2120            datatypes::schema::FulltextAnalyzer::Chinese,
2121            fulltext_options.analyzer
2122        );
2123        assert!(fulltext_options.case_sensitive);
2124
2125        let alter_kind = AlterKind::UnsetIndexes {
2126            options: vec![UnsetIndexOption::Fulltext {
2127                column_name: "my_tag_first".to_string(),
2128            }],
2129        };
2130        let new_meta = new_meta
2131            .builder_with_alter_kind("my_table", &alter_kind)
2132            .unwrap()
2133            .build()
2134            .unwrap();
2135        let column_schema = new_meta
2136            .schema
2137            .column_schema_by_name("my_tag_first")
2138            .unwrap();
2139        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2140        assert!(!fulltext_options.enable);
2141    }
2142}