table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Deserializer, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104impl From<TableType> for datafusion::datasource::TableType {
105    fn from(t: TableType) -> datafusion::datasource::TableType {
106        match t {
107            TableType::Base => datafusion::datasource::TableType::Base,
108            TableType::View => datafusion::datasource::TableType::View,
109            TableType::Temporary => datafusion::datasource::TableType::Temporary,
110        }
111    }
112}
113
114/// Identifier of the table.
115#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
116pub struct TableIdent {
117    /// Unique id of this table.
118    pub table_id: TableId,
119    /// Version of the table, bumped when metadata (such as schema) of the table
120    /// being changed.
121    pub version: TableVersion,
122}
123
124/// The table metadata.
125///
126/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
127#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
128#[builder(pattern = "mutable", custom_constructor)]
129pub struct TableMeta {
130    pub schema: SchemaRef,
131    /// The indices of columns in primary key. Note that the index of timestamp column
132    /// is not included in these indices.
133    pub primary_key_indices: Vec<usize>,
134    #[builder(default = "self.default_value_indices()?")]
135    pub value_indices: Vec<usize>,
136    #[builder(default, setter(into))]
137    pub engine: String,
138    pub next_column_id: ColumnId,
139    /// Table options.
140    #[builder(default)]
141    pub options: TableOptions,
142    #[builder(default = "Utc::now()")]
143    pub created_on: DateTime<Utc>,
144    #[builder(default = "self.default_updated_on()")]
145    pub updated_on: DateTime<Utc>,
146    #[builder(default = "Vec::new()")]
147    pub partition_key_indices: Vec<usize>,
148    #[builder(default = "Vec::new()")]
149    pub column_ids: Vec<ColumnId>,
150}
151
152impl TableMetaBuilder {
153    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
154    #[cfg(any(test, feature = "testing"))]
155    pub fn empty() -> Self {
156        Self {
157            schema: None,
158            primary_key_indices: None,
159            value_indices: None,
160            engine: None,
161            next_column_id: None,
162            options: None,
163            created_on: None,
164            updated_on: None,
165            partition_key_indices: None,
166            column_ids: None,
167        }
168    }
169}
170
171impl TableMetaBuilder {
172    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
173        match (&self.primary_key_indices, &self.schema) {
174            (Some(v), Some(schema)) => {
175                let column_schemas = schema.column_schemas();
176                Ok((0..column_schemas.len())
177                    .filter(|idx| !v.contains(idx))
178                    .collect())
179            }
180            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
181        }
182    }
183
184    fn default_updated_on(&self) -> DateTime<Utc> {
185        self.created_on.unwrap_or_default()
186    }
187
188    pub fn new_external_table() -> Self {
189        Self {
190            schema: None,
191            primary_key_indices: Some(Vec::new()),
192            value_indices: Some(Vec::new()),
193            engine: None,
194            next_column_id: Some(0),
195            options: None,
196            created_on: None,
197            updated_on: None,
198            partition_key_indices: None,
199            column_ids: None,
200        }
201    }
202}
203
204/// The result after splitting requests by column location info.
205struct SplitResult<'a> {
206    /// column requests should be added at first place.
207    columns_at_first: Vec<&'a AddColumnRequest>,
208    /// column requests should be added after already exist columns.
209    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
210    /// column requests should be added at last place.
211    columns_at_last: Vec<&'a AddColumnRequest>,
212    /// all column names should be added.
213    column_names: Vec<String>,
214}
215
216impl TableMeta {
217    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
218        let columns_schemas = &self.schema.column_schemas();
219        self.primary_key_indices
220            .iter()
221            .map(|idx| &columns_schemas[*idx].name)
222    }
223
224    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
225        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
226        let columns_schemas = self.schema.column_schemas();
227        let primary_key_indices = &self.primary_key_indices;
228        columns_schemas
229            .iter()
230            .enumerate()
231            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
232            .map(|(_, cs)| &cs.name)
233    }
234
235    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
236        let columns_schemas = &self.schema.column_schemas();
237        self.partition_key_indices
238            .iter()
239            .map(|idx| &columns_schemas[*idx].name)
240    }
241
242    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
243    ///
244    /// The returned builder would derive the next column id of this meta.
245    pub fn builder_with_alter_kind(
246        &self,
247        table_name: &str,
248        alter_kind: &AlterKind,
249    ) -> Result<TableMetaBuilder> {
250        let mut builder = match alter_kind {
251            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
252            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
253            AlterKind::ModifyColumnTypes { columns } => {
254                self.modify_column_types(table_name, columns)
255            }
256            // No need to rebuild table meta when renaming tables.
257            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
258            AlterKind::SetTableOptions { options } => self.set_table_options(options),
259            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
260            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
261            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
262            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
263            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
264        }?;
265        let _ = builder.updated_on(Utc::now());
266        Ok(builder)
267    }
268
269    /// Creates a [TableMetaBuilder] with modified table options.
270    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
271        let mut new_options = self.options.clone();
272
273        for request in requests {
274            match request {
275                SetRegionOption::Ttl(new_ttl) => {
276                    new_options.ttl = *new_ttl;
277                }
278                SetRegionOption::Twsc(key, value) => {
279                    if !value.is_empty() {
280                        new_options.extra_options.insert(key.clone(), value.clone());
281                        // Ensure node restart correctly.
282                        new_options.extra_options.insert(
283                            COMPACTION_TYPE.to_string(),
284                            COMPACTION_TYPE_TWCS.to_string(),
285                        );
286                    } else {
287                        // Invalidate the previous change option if an empty value has been set.
288                        new_options.extra_options.remove(key.as_str());
289                    }
290                }
291                SetRegionOption::Format(value) => {
292                    new_options
293                        .extra_options
294                        .insert(SST_FORMAT_KEY.to_string(), value.clone());
295                }
296            }
297        }
298        let mut builder = self.new_meta_builder();
299        builder.options(new_options);
300
301        Ok(builder)
302    }
303
304    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
305        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
306        self.set_table_options(&requests)
307    }
308
309    fn set_indexes(
310        &self,
311        table_name: &str,
312        requests: &[SetIndexOption],
313    ) -> Result<TableMetaBuilder> {
314        let table_schema = &self.schema;
315        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
316        for request in requests {
317            let column_name = request.column_name();
318            table_schema
319                .column_index_by_name(column_name)
320                .with_context(|| error::ColumnNotExistsSnafu {
321                    column_name,
322                    table_name,
323                })?;
324            set_index_options
325                .entry(column_name)
326                .or_default()
327                .push(request);
328        }
329
330        let mut meta_builder = self.new_meta_builder();
331        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
332        for mut column in table_schema.column_schemas().iter().cloned() {
333            if let Some(request) = set_index_options.get(column.name.as_str()) {
334                for request in request {
335                    self.set_index(&mut column, request)?;
336                }
337            }
338            columns.push(column);
339        }
340
341        let mut builder = SchemaBuilder::try_from_columns(columns)
342            .with_context(|_| error::SchemaBuildSnafu {
343                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
344            })?
345            .version(table_schema.version() + 1);
346
347        for (k, v) in table_schema.metadata().iter() {
348            builder = builder.add_metadata(k, v);
349        }
350
351        let new_schema = builder.build().with_context(|_| {
352            let column_names = requests
353                .iter()
354                .map(|request| request.column_name())
355                .collect::<Vec<_>>();
356            error::SchemaBuildSnafu {
357                msg: format!(
358                    "Table {table_name} cannot set index options with columns {column_names:?}",
359                ),
360            }
361        })?;
362        let _ = meta_builder
363            .schema(Arc::new(new_schema))
364            .primary_key_indices(self.primary_key_indices.clone());
365
366        Ok(meta_builder)
367    }
368
369    fn unset_indexes(
370        &self,
371        table_name: &str,
372        requests: &[UnsetIndexOption],
373    ) -> Result<TableMetaBuilder> {
374        let table_schema = &self.schema;
375        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
376        for request in requests {
377            let column_name = request.column_name();
378            table_schema
379                .column_index_by_name(column_name)
380                .with_context(|| error::ColumnNotExistsSnafu {
381                    column_name,
382                    table_name,
383                })?;
384            set_index_options
385                .entry(column_name)
386                .or_default()
387                .push(request);
388        }
389
390        let mut meta_builder = self.new_meta_builder();
391        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
392        for mut column in table_schema.column_schemas().iter().cloned() {
393            if let Some(request) = set_index_options.get(column.name.as_str()) {
394                for request in request {
395                    self.unset_index(&mut column, request)?;
396                }
397            }
398            columns.push(column);
399        }
400
401        let mut builder = SchemaBuilder::try_from_columns(columns)
402            .with_context(|_| error::SchemaBuildSnafu {
403                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
404            })?
405            .version(table_schema.version() + 1);
406
407        for (k, v) in table_schema.metadata().iter() {
408            builder = builder.add_metadata(k, v);
409        }
410
411        let new_schema = builder.build().with_context(|_| {
412            let column_names = requests
413                .iter()
414                .map(|request| request.column_name())
415                .collect::<Vec<_>>();
416            error::SchemaBuildSnafu {
417                msg: format!(
418                    "Table {table_name} cannot set index options with columns {column_names:?}",
419                ),
420            }
421        })?;
422        let _ = meta_builder
423            .schema(Arc::new(new_schema))
424            .primary_key_indices(self.primary_key_indices.clone());
425
426        Ok(meta_builder)
427    }
428
429    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
430        match request {
431            SetIndexOption::Fulltext {
432                column_name,
433                options,
434            } => {
435                ensure!(
436                    column_schema.data_type.is_string(),
437                    error::InvalidColumnOptionSnafu {
438                        column_name,
439                        msg: "FULLTEXT index only supports string type",
440                    }
441                );
442
443                let current_fulltext_options = column_schema
444                    .fulltext_options()
445                    .context(error::SetFulltextOptionsSnafu { column_name })?;
446                set_column_fulltext_options(
447                    column_schema,
448                    column_name,
449                    options,
450                    current_fulltext_options,
451                )?;
452            }
453            SetIndexOption::Inverted { column_name } => {
454                debug_assert_eq!(column_schema.name, *column_name);
455                column_schema.set_inverted_index(true);
456            }
457            SetIndexOption::Skipping {
458                column_name,
459                options,
460            } => {
461                set_column_skipping_index_options(column_schema, column_name, options)?;
462            }
463        }
464
465        Ok(())
466    }
467
468    fn unset_index(
469        &self,
470        column_schema: &mut ColumnSchema,
471        request: &UnsetIndexOption,
472    ) -> Result<()> {
473        match request {
474            UnsetIndexOption::Fulltext { column_name } => {
475                let current_fulltext_options = column_schema
476                    .fulltext_options()
477                    .context(error::SetFulltextOptionsSnafu { column_name })?;
478                unset_column_fulltext_options(
479                    column_schema,
480                    column_name,
481                    current_fulltext_options.clone(),
482                )?
483            }
484            UnsetIndexOption::Inverted { .. } => {
485                column_schema.set_inverted_index(false);
486            }
487            UnsetIndexOption::Skipping { column_name } => {
488                unset_column_skipping_index_options(column_schema, column_name)?;
489            }
490        }
491
492        Ok(())
493    }
494
495    // TODO(yingwen): Remove this.
496    /// Allocate a new column for the table.
497    ///
498    /// This method would bump the `next_column_id` of the meta.
499    pub fn alloc_new_column(
500        &mut self,
501        table_name: &str,
502        new_column: &ColumnSchema,
503    ) -> Result<ColumnDescriptor> {
504        let desc = ColumnDescriptorBuilder::new(
505            self.next_column_id as ColumnId,
506            &new_column.name,
507            new_column.data_type.clone(),
508        )
509        .is_nullable(new_column.is_nullable())
510        .default_constraint(new_column.default_constraint().cloned())
511        .build()
512        .context(error::BuildColumnDescriptorSnafu {
513            table_name,
514            column_name: &new_column.name,
515        })?;
516
517        // Bump next column id.
518        self.next_column_id += 1;
519
520        Ok(desc)
521    }
522
523    /// Create a [`TableMetaBuilder`] from the current TableMeta.
524    fn new_meta_builder(&self) -> TableMetaBuilder {
525        let mut builder = TableMetaBuilder::from(self);
526        // Manually remove value_indices.
527        builder.value_indices = None;
528        builder
529    }
530
531    // TODO(yingwen): Tests add if not exists.
532    fn add_columns(
533        &self,
534        table_name: &str,
535        requests: &[AddColumnRequest],
536    ) -> Result<TableMetaBuilder> {
537        let table_schema = &self.schema;
538        let mut meta_builder = self.new_meta_builder();
539        let original_primary_key_indices: HashSet<&usize> =
540            self.primary_key_indices.iter().collect();
541
542        let mut names = HashSet::with_capacity(requests.len());
543        let mut new_columns = Vec::with_capacity(requests.len());
544        for col_to_add in requests {
545            if let Some(column_schema) =
546                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
547            {
548                // If the column already exists.
549                ensure!(
550                    col_to_add.add_if_not_exists,
551                    error::ColumnExistsSnafu {
552                        table_name,
553                        column_name: &col_to_add.column_schema.name
554                    },
555                );
556
557                // Checks if the type is the same
558                ensure!(
559                    column_schema.data_type == col_to_add.column_schema.data_type,
560                    error::InvalidAlterRequestSnafu {
561                        table: table_name,
562                        err: format!(
563                            "column {} already exists with different type {:?}",
564                            col_to_add.column_schema.name, column_schema.data_type,
565                        ),
566                    }
567                );
568            } else {
569                // A new column.
570                // Ensures we only add a column once.
571                ensure!(
572                    names.insert(&col_to_add.column_schema.name),
573                    error::InvalidAlterRequestSnafu {
574                        table: table_name,
575                        err: format!(
576                            "add column {} more than once",
577                            col_to_add.column_schema.name
578                        ),
579                    }
580                );
581
582                ensure!(
583                    col_to_add.column_schema.is_nullable()
584                        || col_to_add.column_schema.default_constraint().is_some(),
585                    error::InvalidAlterRequestSnafu {
586                        table: table_name,
587                        err: format!(
588                            "no default value for column {}",
589                            col_to_add.column_schema.name
590                        ),
591                    },
592                );
593
594                new_columns.push(col_to_add.clone());
595            }
596        }
597        let requests = &new_columns[..];
598
599        let SplitResult {
600            columns_at_first,
601            columns_at_after,
602            columns_at_last,
603            column_names,
604        } = self.split_requests_by_column_location(table_name, requests)?;
605        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
606        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
607        // add new columns with FIRST, and in reverse order of requests.
608        columns_at_first.iter().rev().for_each(|request| {
609            if request.is_key {
610                // If a key column is added, we also need to store its index in primary_key_indices.
611                primary_key_indices.push(columns.len());
612            }
613            columns.push(request.column_schema.clone());
614        });
615        // add existed columns in original order and handle new columns with AFTER.
616        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
617            if original_primary_key_indices.contains(&index) {
618                primary_key_indices.push(columns.len());
619            }
620            columns.push(column_schema.clone());
621            if let Some(requests) = columns_at_after.get(&column_schema.name) {
622                requests.iter().rev().for_each(|request| {
623                    if request.is_key {
624                        // If a key column is added, we also need to store its index in primary_key_indices.
625                        primary_key_indices.push(columns.len());
626                    }
627                    columns.push(request.column_schema.clone());
628                });
629            }
630        }
631        // add new columns without location info to last.
632        columns_at_last.iter().for_each(|request| {
633            if request.is_key {
634                // If a key column is added, we also need to store its index in primary_key_indices.
635                primary_key_indices.push(columns.len());
636            }
637            columns.push(request.column_schema.clone());
638        });
639
640        let mut builder = SchemaBuilder::try_from(columns)
641            .with_context(|_| error::SchemaBuildSnafu {
642                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
643            })?
644            // Also bump the schema version.
645            .version(table_schema.version() + 1);
646        for (k, v) in table_schema.metadata().iter() {
647            builder = builder.add_metadata(k, v);
648        }
649        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
650            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
651        })?;
652
653        let partition_key_indices = self
654            .partition_key_indices
655            .iter()
656            .map(|idx| table_schema.column_name_by_index(*idx))
657            // This unwrap is safe since we only add new columns.
658            .map(|name| new_schema.column_index_by_name(name).unwrap())
659            .collect();
660
661        // value_indices would be generated automatically.
662        let _ = meta_builder
663            .schema(Arc::new(new_schema))
664            .primary_key_indices(primary_key_indices)
665            .partition_key_indices(partition_key_indices);
666
667        Ok(meta_builder)
668    }
669
670    fn remove_columns(
671        &self,
672        table_name: &str,
673        column_names: &[String],
674    ) -> Result<TableMetaBuilder> {
675        let table_schema = &self.schema;
676        let column_names: HashSet<_> = column_names.iter().collect();
677        let mut meta_builder = self.new_meta_builder();
678
679        let timestamp_index = table_schema.timestamp_index();
680        // Check whether columns are existing and not in primary key index.
681        for column_name in &column_names {
682            if let Some(index) = table_schema.column_index_by_name(column_name) {
683                // This is a linear search, but since there won't be too much columns, the performance should
684                // be acceptable.
685                ensure!(
686                    !self.primary_key_indices.contains(&index),
687                    error::RemoveColumnInIndexSnafu {
688                        column_name: *column_name,
689                        table_name,
690                    }
691                );
692
693                ensure!(
694                    !self.partition_key_indices.contains(&index),
695                    error::RemovePartitionColumnSnafu {
696                        column_name: *column_name,
697                        table_name,
698                    }
699                );
700
701                if let Some(ts_index) = timestamp_index {
702                    // Not allowed to remove column in timestamp index.
703                    ensure!(
704                        index != ts_index,
705                        error::RemoveColumnInIndexSnafu {
706                            column_name: table_schema.column_name_by_index(ts_index),
707                            table_name,
708                        }
709                    );
710                }
711            } else {
712                return error::ColumnNotExistsSnafu {
713                    column_name: *column_name,
714                    table_name,
715                }
716                .fail()?;
717            }
718        }
719
720        // Collect columns after removal.
721        let columns: Vec<_> = table_schema
722            .column_schemas()
723            .iter()
724            .filter(|column_schema| !column_names.contains(&column_schema.name))
725            .cloned()
726            .collect();
727
728        let mut builder = SchemaBuilder::try_from_columns(columns)
729            .with_context(|_| error::SchemaBuildSnafu {
730                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
731            })?
732            // Also bump the schema version.
733            .version(table_schema.version() + 1);
734        for (k, v) in table_schema.metadata().iter() {
735            builder = builder.add_metadata(k, v);
736        }
737        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
738            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
739        })?;
740
741        // Rebuild the indices of primary key columns.
742        let primary_key_indices = self
743            .primary_key_indices
744            .iter()
745            .map(|idx| table_schema.column_name_by_index(*idx))
746            // This unwrap is safe since we don't allow removing a primary key column.
747            .map(|name| new_schema.column_index_by_name(name).unwrap())
748            .collect();
749
750        let partition_key_indices = self
751            .partition_key_indices
752            .iter()
753            .map(|idx| table_schema.column_name_by_index(*idx))
754            // This unwrap is safe since we don't allow removing a partition key column.
755            .map(|name| new_schema.column_index_by_name(name).unwrap())
756            .collect();
757
758        let _ = meta_builder
759            .schema(Arc::new(new_schema))
760            .primary_key_indices(primary_key_indices)
761            .partition_key_indices(partition_key_indices);
762
763        Ok(meta_builder)
764    }
765
766    fn modify_column_types(
767        &self,
768        table_name: &str,
769        requests: &[ModifyColumnTypeRequest],
770    ) -> Result<TableMetaBuilder> {
771        let table_schema = &self.schema;
772        let mut meta_builder = self.new_meta_builder();
773
774        let mut modify_column_types = HashMap::with_capacity(requests.len());
775        let timestamp_index = table_schema.timestamp_index();
776
777        for col_to_change in requests {
778            let change_column_name = &col_to_change.column_name;
779
780            let index = table_schema
781                .column_index_by_name(change_column_name)
782                .with_context(|| error::ColumnNotExistsSnafu {
783                    column_name: change_column_name,
784                    table_name,
785                })?;
786
787            let column = &table_schema.column_schemas()[index];
788
789            ensure!(
790                !self.primary_key_indices.contains(&index),
791                error::InvalidAlterRequestSnafu {
792                    table: table_name,
793                    err: format!(
794                        "Not allowed to change primary key index column '{}'",
795                        column.name
796                    )
797                }
798            );
799
800            if let Some(ts_index) = timestamp_index {
801                // Not allowed to change column datatype in timestamp index.
802                ensure!(
803                    index != ts_index,
804                    error::InvalidAlterRequestSnafu {
805                        table: table_name,
806                        err: format!(
807                            "Not allowed to change timestamp index column '{}' datatype",
808                            column.name
809                        )
810                    }
811                );
812            }
813
814            ensure!(
815                modify_column_types
816                    .insert(&col_to_change.column_name, col_to_change)
817                    .is_none(),
818                error::InvalidAlterRequestSnafu {
819                    table: table_name,
820                    err: format!(
821                        "change column datatype {} more than once",
822                        col_to_change.column_name
823                    ),
824                }
825            );
826
827            ensure!(
828                column
829                    .data_type
830                    .can_arrow_type_cast_to(&col_to_change.target_type),
831                error::InvalidAlterRequestSnafu {
832                    table: table_name,
833                    err: format!(
834                        "column '{}' cannot be cast automatically to type '{}'",
835                        col_to_change.column_name, col_to_change.target_type,
836                    ),
837                }
838            );
839
840            ensure!(
841                column.is_nullable(),
842                error::InvalidAlterRequestSnafu {
843                    table: table_name,
844                    err: format!(
845                        "column '{}' must be nullable to ensure safe conversion.",
846                        col_to_change.column_name,
847                    ),
848                }
849            );
850        }
851        // Collect columns after changed.
852
853        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
854        for mut column in table_schema.column_schemas().iter().cloned() {
855            if let Some(change_column) = modify_column_types.get(&column.name) {
856                column.data_type = change_column.target_type.clone();
857                let new_default = if let Some(default_value) = column.default_constraint() {
858                    Some(
859                        default_value
860                            .cast_to_datatype(&change_column.target_type)
861                            .with_context(|_| error::CastDefaultValueSnafu {
862                                reason: format!(
863                                    "Failed to cast default value from {:?} to type {:?}",
864                                    default_value, &change_column.target_type
865                                ),
866                            })?,
867                    )
868                } else {
869                    None
870                };
871                column = column
872                    .clone()
873                    .with_default_constraint(new_default.clone())
874                    .with_context(|_| error::CastDefaultValueSnafu {
875                        reason: format!("Failed to set new default: {:?}", new_default),
876                    })?;
877            }
878            columns.push(column)
879        }
880
881        let mut builder = SchemaBuilder::try_from_columns(columns)
882            .with_context(|_| error::SchemaBuildSnafu {
883                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
884            })?
885            // Also bump the schema version.
886            .version(table_schema.version() + 1);
887        for (k, v) in table_schema.metadata().iter() {
888            builder = builder.add_metadata(k, v);
889        }
890        let new_schema = builder.build().with_context(|_| {
891            let column_names: Vec<_> = requests
892                .iter()
893                .map(|request| &request.column_name)
894                .collect();
895
896            error::SchemaBuildSnafu {
897                msg: format!(
898                    "Table {table_name} cannot change datatype with columns {column_names:?}"
899                ),
900            }
901        })?;
902
903        let _ = meta_builder
904            .schema(Arc::new(new_schema))
905            .primary_key_indices(self.primary_key_indices.clone());
906
907        Ok(meta_builder)
908    }
909
910    /// Split requests into different groups using column location info.
911    fn split_requests_by_column_location<'a>(
912        &self,
913        table_name: &str,
914        requests: &'a [AddColumnRequest],
915    ) -> Result<SplitResult<'a>> {
916        let table_schema = &self.schema;
917        let mut columns_at_first = Vec::new();
918        let mut columns_at_after = HashMap::new();
919        let mut columns_at_last = Vec::new();
920        let mut column_names = Vec::with_capacity(requests.len());
921        for request in requests {
922            // Check whether columns to add are already existing.
923            let column_name = &request.column_schema.name;
924            column_names.push(column_name.clone());
925            ensure!(
926                table_schema.column_schema_by_name(column_name).is_none(),
927                error::ColumnExistsSnafu {
928                    column_name,
929                    table_name,
930                }
931            );
932            match request.location.as_ref() {
933                Some(AddColumnLocation::First) => {
934                    columns_at_first.push(request);
935                }
936                Some(AddColumnLocation::After { column_name }) => {
937                    ensure!(
938                        table_schema.column_schema_by_name(column_name).is_some(),
939                        error::ColumnNotExistsSnafu {
940                            column_name,
941                            table_name,
942                        }
943                    );
944                    columns_at_after
945                        .entry(column_name.clone())
946                        .or_insert(Vec::new())
947                        .push(request);
948                }
949                None => {
950                    columns_at_last.push(request);
951                }
952            }
953        }
954        Ok(SplitResult {
955            columns_at_first,
956            columns_at_after,
957            columns_at_last,
958            column_names,
959        })
960    }
961
962    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
963        let table_schema = &self.schema;
964        let mut meta_builder = self.new_meta_builder();
965        let mut columns = Vec::with_capacity(table_schema.num_columns());
966        for column_schema in table_schema.column_schemas() {
967            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
968                // Drop default constraint.
969                ensure!(
970                    column_schema.default_constraint().is_some(),
971                    error::InvalidAlterRequestSnafu {
972                        table: table_name,
973                        err: format!("column {name} does not have a default value"),
974                    }
975                );
976                if !column_schema.is_nullable() {
977                    return error::InvalidAlterRequestSnafu {
978                        table: table_name,
979                        err: format!(
980                            "column {name} is not nullable and `default` cannot be dropped",
981                        ),
982                    }
983                    .fail();
984                }
985                let new_column_schema = column_schema.clone();
986                let new_column_schema = new_column_schema
987                    .with_default_constraint(None)
988                    .with_context(|_| error::SchemaBuildSnafu {
989                        msg: format!("Table {table_name} cannot drop default values"),
990                    })?;
991                columns.push(new_column_schema);
992            } else {
993                columns.push(column_schema.clone());
994            }
995        }
996
997        let mut builder = SchemaBuilder::try_from_columns(columns)
998            .with_context(|_| error::SchemaBuildSnafu {
999                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1000            })?
1001            // Also bump the schema version.
1002            .version(table_schema.version() + 1);
1003        for (k, v) in table_schema.metadata().iter() {
1004            builder = builder.add_metadata(k, v);
1005        }
1006        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1007            msg: format!("Table {table_name} cannot drop default values"),
1008        })?;
1009
1010        let _ = meta_builder.schema(Arc::new(new_schema));
1011
1012        Ok(meta_builder)
1013    }
1014
1015    fn set_defaults(
1016        &self,
1017        table_name: &str,
1018        set_defaults: &[SetDefaultRequest],
1019    ) -> Result<TableMetaBuilder> {
1020        let table_schema = &self.schema;
1021        let mut meta_builder = self.new_meta_builder();
1022        let mut columns = Vec::with_capacity(table_schema.num_columns());
1023        for column_schema in table_schema.column_schemas() {
1024            if let Some(set_default) = set_defaults
1025                .iter()
1026                .find(|s| s.column_name == column_schema.name)
1027            {
1028                let new_column_schema = column_schema.clone();
1029                let new_column_schema = new_column_schema
1030                    .with_default_constraint(set_default.default_constraint.clone())
1031                    .with_context(|_| error::SchemaBuildSnafu {
1032                        msg: format!("Table {table_name} cannot set default values"),
1033                    })?;
1034                columns.push(new_column_schema);
1035            } else {
1036                columns.push(column_schema.clone());
1037            }
1038        }
1039
1040        let mut builder = SchemaBuilder::try_from_columns(columns)
1041            .with_context(|_| error::SchemaBuildSnafu {
1042                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1043            })?
1044            // Also bump the schema version.
1045            .version(table_schema.version() + 1);
1046        for (k, v) in table_schema.metadata().iter() {
1047            builder = builder.add_metadata(k, v);
1048        }
1049        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1050            msg: format!("Table {table_name} cannot set default values"),
1051        })?;
1052
1053        let _ = meta_builder.schema(Arc::new(new_schema));
1054
1055        Ok(meta_builder)
1056    }
1057}
1058
1059#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1060#[builder(pattern = "owned")]
1061pub struct TableInfo {
1062    /// Id and version of the table.
1063    #[builder(default, setter(into))]
1064    pub ident: TableIdent,
1065    /// Name of the table.
1066    #[builder(setter(into))]
1067    pub name: String,
1068    /// Comment of the table.
1069    #[builder(default, setter(into))]
1070    pub desc: Option<String>,
1071    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1072    pub catalog_name: String,
1073    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1074    pub schema_name: String,
1075    pub meta: TableMeta,
1076    #[builder(default = "TableType::Base")]
1077    pub table_type: TableType,
1078}
1079
1080pub type TableInfoRef = Arc<TableInfo>;
1081
1082impl TableInfo {
1083    pub fn table_id(&self) -> TableId {
1084        self.ident.table_id
1085    }
1086
1087    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1088    pub fn full_table_name(&self) -> String {
1089        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1090    }
1091
1092    pub fn get_db_string(&self) -> String {
1093        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1094    }
1095
1096    /// Returns true when the table is the metric engine's physical table.
1097    pub fn is_physical_table(&self) -> bool {
1098        self.meta
1099            .options
1100            .extra_options
1101            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1102    }
1103
1104    /// Return true if the table's TTL is `instant`.
1105    pub fn is_ttl_instant_table(&self) -> bool {
1106        self.meta
1107            .options
1108            .ttl
1109            .map(|t| t.is_instant())
1110            .unwrap_or(false)
1111    }
1112}
1113
1114impl TableInfoBuilder {
1115    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1116        Self {
1117            name: Some(name.into()),
1118            meta: Some(meta),
1119            ..Default::default()
1120        }
1121    }
1122
1123    pub fn table_id(mut self, id: TableId) -> Self {
1124        let ident = self.ident.get_or_insert_with(TableIdent::default);
1125        ident.table_id = id;
1126        self
1127    }
1128
1129    pub fn table_version(mut self, version: TableVersion) -> Self {
1130        let ident = self.ident.get_or_insert_with(TableIdent::default);
1131        ident.version = version;
1132        self
1133    }
1134}
1135
1136impl TableIdent {
1137    pub fn new(table_id: TableId) -> Self {
1138        Self {
1139            table_id,
1140            version: 0,
1141        }
1142    }
1143}
1144
1145impl From<TableId> for TableIdent {
1146    fn from(table_id: TableId) -> Self {
1147        Self::new(table_id)
1148    }
1149}
1150
1151/// Struct used to serialize and deserialize [`TableMeta`].
1152#[derive(Debug, PartialEq, Eq, Clone, Serialize, Default)]
1153pub struct RawTableMeta {
1154    pub schema: RawSchema,
1155    /// The indices of columns in primary key. Note that the index of timestamp column
1156    /// is not included. Order matters to this array.
1157    pub primary_key_indices: Vec<usize>,
1158    ///  The indices of columns in value. The index of timestamp column is included.
1159    /// Order doesn't matter to this array.
1160    pub value_indices: Vec<usize>,
1161    /// Engine type of this table. Usually in small case.
1162    pub engine: String,
1163    /// Next column id of a new column.
1164    /// It's used to ensure all columns with the same name across all regions have the same column id.
1165    pub next_column_id: ColumnId,
1166    pub options: TableOptions,
1167    pub created_on: DateTime<Utc>,
1168    pub updated_on: DateTime<Utc>,
1169    /// Order doesn't matter to this array.
1170    #[serde(default)]
1171    pub partition_key_indices: Vec<usize>,
1172    /// Map of column name to column id.
1173    /// Note: This field may be empty for older versions that did not include this field.
1174    #[serde(default)]
1175    pub column_ids: Vec<ColumnId>,
1176}
1177
1178impl<'de> Deserialize<'de> for RawTableMeta {
1179    fn deserialize<D>(
1180        deserializer: D,
1181    ) -> std::result::Result<RawTableMeta, <D as Deserializer<'de>>::Error>
1182    where
1183        D: Deserializer<'de>,
1184    {
1185        #[derive(Deserialize)]
1186        struct Helper {
1187            schema: RawSchema,
1188            primary_key_indices: Vec<usize>,
1189            value_indices: Vec<usize>,
1190            engine: String,
1191            next_column_id: u32,
1192            options: TableOptions,
1193            created_on: DateTime<Utc>,
1194            updated_on: Option<DateTime<Utc>>,
1195            #[serde(default)]
1196            partition_key_indices: Vec<usize>,
1197            #[serde(default)]
1198            column_ids: Vec<ColumnId>,
1199        }
1200
1201        let h = Helper::deserialize(deserializer)?;
1202        Ok(RawTableMeta {
1203            schema: h.schema,
1204            primary_key_indices: h.primary_key_indices,
1205            value_indices: h.value_indices,
1206            engine: h.engine,
1207            next_column_id: h.next_column_id,
1208            options: h.options,
1209            created_on: h.created_on,
1210            updated_on: h.updated_on.unwrap_or(h.created_on),
1211            partition_key_indices: h.partition_key_indices,
1212            column_ids: h.column_ids,
1213        })
1214    }
1215}
1216
1217impl From<TableMeta> for RawTableMeta {
1218    fn from(meta: TableMeta) -> RawTableMeta {
1219        RawTableMeta {
1220            schema: RawSchema::from(&*meta.schema),
1221            primary_key_indices: meta.primary_key_indices,
1222            value_indices: meta.value_indices,
1223            engine: meta.engine,
1224            next_column_id: meta.next_column_id,
1225            options: meta.options,
1226            created_on: meta.created_on,
1227            updated_on: meta.updated_on,
1228            partition_key_indices: meta.partition_key_indices,
1229            column_ids: meta.column_ids,
1230        }
1231    }
1232}
1233
1234impl TryFrom<RawTableMeta> for TableMeta {
1235    type Error = ConvertError;
1236
1237    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1238        Ok(TableMeta {
1239            schema: Arc::new(Schema::try_from(raw.schema)?),
1240            primary_key_indices: raw.primary_key_indices,
1241            value_indices: raw.value_indices,
1242            engine: raw.engine,
1243            next_column_id: raw.next_column_id,
1244            options: raw.options,
1245            created_on: raw.created_on,
1246            updated_on: raw.updated_on,
1247            partition_key_indices: raw.partition_key_indices,
1248            column_ids: raw.column_ids,
1249        })
1250    }
1251}
1252
1253/// Struct used to serialize and deserialize [`TableInfo`].
1254#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1255pub struct RawTableInfo {
1256    pub ident: TableIdent,
1257    pub name: String,
1258    pub desc: Option<String>,
1259    pub catalog_name: String,
1260    pub schema_name: String,
1261    pub meta: RawTableMeta,
1262    pub table_type: TableType,
1263}
1264
1265impl RawTableInfo {
1266    /// Returns the map of column name to column id.
1267    ///
1268    /// Note: This method may return an empty map for older versions that did not include this field.
1269    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1270        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1271            None
1272        } else {
1273            Some(
1274                self.meta
1275                    .column_ids
1276                    .iter()
1277                    .enumerate()
1278                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1279                    .collect(),
1280            )
1281        }
1282    }
1283
1284    /// Sort the columns in [RawTableInfo], logical tables require it.
1285    pub fn sort_columns(&mut self) {
1286        let column_schemas = &self.meta.schema.column_schemas;
1287        let primary_keys = self
1288            .meta
1289            .primary_key_indices
1290            .iter()
1291            .map(|index| column_schemas[*index].name.clone())
1292            .collect::<HashSet<_>>();
1293
1294        let name_to_ids = self.name_to_ids().unwrap_or_default();
1295        self.meta
1296            .schema
1297            .column_schemas
1298            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1299
1300        // Compute new indices of sorted columns
1301        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1302        let mut timestamp_index = None;
1303        let mut value_indices =
1304            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1305        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1306        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1307            if primary_keys.contains(&column_schema.name) {
1308                primary_key_indices.push(index);
1309            } else if column_schema.is_time_index() {
1310                value_indices.push(index);
1311                timestamp_index = Some(index);
1312            } else {
1313                value_indices.push(index);
1314            }
1315            if let Some(id) = name_to_ids.get(&column_schema.name) {
1316                column_ids.push(*id);
1317            }
1318        }
1319
1320        // Overwrite table meta
1321        self.meta.schema.timestamp_index = timestamp_index;
1322        self.meta.primary_key_indices = primary_key_indices;
1323        self.meta.value_indices = value_indices;
1324        self.meta.column_ids = column_ids;
1325    }
1326
1327    /// Extracts region options from table info.
1328    ///
1329    /// All "region options" are actually a copy of table options for redundancy.
1330    pub fn to_region_options(&self) -> HashMap<String, String> {
1331        HashMap::from(&self.meta.options)
1332    }
1333
1334    /// Returns the table reference.
1335    pub fn table_ref(&self) -> TableReference<'_> {
1336        TableReference::full(
1337            self.catalog_name.as_str(),
1338            self.schema_name.as_str(),
1339            self.name.as_str(),
1340        )
1341    }
1342}
1343
1344impl From<TableInfo> for RawTableInfo {
1345    fn from(info: TableInfo) -> RawTableInfo {
1346        RawTableInfo {
1347            ident: info.ident,
1348            name: info.name,
1349            desc: info.desc,
1350            catalog_name: info.catalog_name,
1351            schema_name: info.schema_name,
1352            meta: RawTableMeta::from(info.meta),
1353            table_type: info.table_type,
1354        }
1355    }
1356}
1357
1358impl TryFrom<RawTableInfo> for TableInfo {
1359    type Error = ConvertError;
1360
1361    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1362        Ok(TableInfo {
1363            ident: raw.ident,
1364            name: raw.name,
1365            desc: raw.desc,
1366            catalog_name: raw.catalog_name,
1367            schema_name: raw.schema_name,
1368            meta: TableMeta::try_from(raw.meta)?,
1369            table_type: raw.table_type,
1370        })
1371    }
1372}
1373
1374/// Set column fulltext options if it passed the validation.
1375///
1376/// Options allowed to modify:
1377/// * backend
1378///
1379/// Options not allowed to modify:
1380/// * analyzer
1381/// * case_sensitive
1382fn set_column_fulltext_options(
1383    column_schema: &mut ColumnSchema,
1384    column_name: &str,
1385    options: &FulltextOptions,
1386    current_options: Option<FulltextOptions>,
1387) -> Result<()> {
1388    if let Some(current_options) = current_options {
1389        ensure!(
1390            current_options.analyzer == options.analyzer
1391                && current_options.case_sensitive == options.case_sensitive,
1392            error::InvalidColumnOptionSnafu {
1393                column_name,
1394                msg: format!(
1395                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1396                    current_options.analyzer, current_options.case_sensitive
1397                ),
1398            }
1399        );
1400    }
1401
1402    column_schema
1403        .set_fulltext_options(options)
1404        .context(error::SetFulltextOptionsSnafu { column_name })?;
1405
1406    Ok(())
1407}
1408
1409fn unset_column_fulltext_options(
1410    column_schema: &mut ColumnSchema,
1411    column_name: &str,
1412    current_options: Option<FulltextOptions>,
1413) -> Result<()> {
1414    ensure!(
1415        current_options
1416            .as_ref()
1417            .is_some_and(|options| options.enable),
1418        error::InvalidColumnOptionSnafu {
1419            column_name,
1420            msg: "FULLTEXT index already disabled".to_string(),
1421        }
1422    );
1423
1424    let mut options = current_options.unwrap();
1425    options.enable = false;
1426    column_schema
1427        .set_fulltext_options(&options)
1428        .context(error::SetFulltextOptionsSnafu { column_name })?;
1429
1430    Ok(())
1431}
1432
1433fn set_column_skipping_index_options(
1434    column_schema: &mut ColumnSchema,
1435    column_name: &str,
1436    options: &SkippingIndexOptions,
1437) -> Result<()> {
1438    column_schema
1439        .set_skipping_options(options)
1440        .context(error::SetSkippingOptionsSnafu { column_name })?;
1441
1442    Ok(())
1443}
1444
1445fn unset_column_skipping_index_options(
1446    column_schema: &mut ColumnSchema,
1447    column_name: &str,
1448) -> Result<()> {
1449    column_schema
1450        .unset_skipping_options()
1451        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1452    Ok(())
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457    use std::assert_matches::assert_matches;
1458
1459    use common_error::ext::ErrorExt;
1460    use common_error::status_code::StatusCode;
1461    use datatypes::data_type::ConcreteDataType;
1462    use datatypes::schema::{
1463        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1464    };
1465
1466    use super::*;
1467    use crate::Error;
1468
1469    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1470    fn new_test_schema() -> Schema {
1471        let column_schemas = vec![
1472            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1473            ColumnSchema::new(
1474                "ts",
1475                ConcreteDataType::timestamp_millisecond_datatype(),
1476                false,
1477            )
1478            .with_time_index(true),
1479            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1480        ];
1481        SchemaBuilder::try_from(column_schemas)
1482            .unwrap()
1483            .version(123)
1484            .build()
1485            .unwrap()
1486    }
1487
1488    #[test]
1489    fn test_raw_convert() {
1490        let schema = Arc::new(new_test_schema());
1491        let meta = TableMetaBuilder::empty()
1492            .schema(schema)
1493            .primary_key_indices(vec![0])
1494            .engine("engine")
1495            .next_column_id(3)
1496            .build()
1497            .unwrap();
1498        let info = TableInfoBuilder::default()
1499            .table_id(10)
1500            .table_version(5)
1501            .name("mytable")
1502            .meta(meta)
1503            .build()
1504            .unwrap();
1505
1506        let raw = RawTableInfo::from(info.clone());
1507        let info_new = TableInfo::try_from(raw).unwrap();
1508
1509        assert_eq!(info, info_new);
1510    }
1511
1512    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1513        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1514        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1515        let alter_kind = AlterKind::AddColumns {
1516            columns: vec![
1517                AddColumnRequest {
1518                    column_schema: new_tag,
1519                    is_key: true,
1520                    location: None,
1521                    add_if_not_exists: false,
1522                },
1523                AddColumnRequest {
1524                    column_schema: new_field,
1525                    is_key: false,
1526                    location: None,
1527                    add_if_not_exists: false,
1528                },
1529            ],
1530        };
1531
1532        let builder = meta
1533            .builder_with_alter_kind("my_table", &alter_kind)
1534            .unwrap();
1535        builder.build().unwrap()
1536    }
1537
1538    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1539        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1540        let new_field = ColumnSchema::new(
1541            "my_field_after_ts",
1542            ConcreteDataType::string_datatype(),
1543            true,
1544        );
1545        let yet_another_field = ColumnSchema::new(
1546            "yet_another_field_after_ts",
1547            ConcreteDataType::int64_datatype(),
1548            true,
1549        );
1550        let alter_kind = AlterKind::AddColumns {
1551            columns: vec![
1552                AddColumnRequest {
1553                    column_schema: new_tag,
1554                    is_key: true,
1555                    location: Some(AddColumnLocation::First),
1556                    add_if_not_exists: false,
1557                },
1558                AddColumnRequest {
1559                    column_schema: new_field,
1560                    is_key: false,
1561                    location: Some(AddColumnLocation::After {
1562                        column_name: "ts".to_string(),
1563                    }),
1564                    add_if_not_exists: false,
1565                },
1566                AddColumnRequest {
1567                    column_schema: yet_another_field,
1568                    is_key: true,
1569                    location: Some(AddColumnLocation::After {
1570                        column_name: "ts".to_string(),
1571                    }),
1572                    add_if_not_exists: false,
1573                },
1574            ],
1575        };
1576
1577        let builder = meta
1578            .builder_with_alter_kind("my_table", &alter_kind)
1579            .unwrap();
1580        builder.build().unwrap()
1581    }
1582
1583    #[test]
1584    fn test_add_columns() {
1585        let schema = Arc::new(new_test_schema());
1586        let meta = TableMetaBuilder::empty()
1587            .schema(schema)
1588            .primary_key_indices(vec![0])
1589            .engine("engine")
1590            .next_column_id(3)
1591            .build()
1592            .unwrap();
1593
1594        let new_meta = add_columns_to_meta(&meta);
1595        let names: Vec<String> = new_meta
1596            .schema
1597            .column_schemas()
1598            .iter()
1599            .map(|column_schema| column_schema.name.clone())
1600            .collect();
1601        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1602        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1603        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1604    }
1605
1606    #[test]
1607    fn test_add_columns_multiple_times() {
1608        let schema = Arc::new(new_test_schema());
1609        let meta = TableMetaBuilder::empty()
1610            .schema(schema)
1611            .primary_key_indices(vec![0])
1612            .engine("engine")
1613            .next_column_id(3)
1614            .build()
1615            .unwrap();
1616
1617        let alter_kind = AlterKind::AddColumns {
1618            columns: vec![
1619                AddColumnRequest {
1620                    column_schema: ColumnSchema::new(
1621                        "col3",
1622                        ConcreteDataType::int32_datatype(),
1623                        true,
1624                    ),
1625                    is_key: true,
1626                    location: None,
1627                    add_if_not_exists: true,
1628                },
1629                AddColumnRequest {
1630                    column_schema: ColumnSchema::new(
1631                        "col3",
1632                        ConcreteDataType::int32_datatype(),
1633                        true,
1634                    ),
1635                    is_key: true,
1636                    location: None,
1637                    add_if_not_exists: true,
1638                },
1639            ],
1640        };
1641        let err = meta
1642            .builder_with_alter_kind("my_table", &alter_kind)
1643            .err()
1644            .unwrap();
1645        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1646    }
1647
1648    #[test]
1649    fn test_remove_columns() {
1650        let schema = Arc::new(new_test_schema());
1651        let meta = TableMetaBuilder::empty()
1652            .schema(schema.clone())
1653            .primary_key_indices(vec![0])
1654            .engine("engine")
1655            .next_column_id(3)
1656            .build()
1657            .unwrap();
1658        // Add more columns so we have enough candidate columns to remove.
1659        let meta = add_columns_to_meta(&meta);
1660
1661        let alter_kind = AlterKind::DropColumns {
1662            names: vec![String::from("col2"), String::from("my_field")],
1663        };
1664        let new_meta = meta
1665            .builder_with_alter_kind("my_table", &alter_kind)
1666            .unwrap()
1667            .build()
1668            .unwrap();
1669
1670        let names: Vec<String> = new_meta
1671            .schema
1672            .column_schemas()
1673            .iter()
1674            .map(|column_schema| column_schema.name.clone())
1675            .collect();
1676        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1677        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1678        assert_eq!(&[1], &new_meta.value_indices[..]);
1679        assert_eq!(
1680            schema.timestamp_column(),
1681            new_meta.schema.timestamp_column()
1682        );
1683    }
1684
1685    #[test]
1686    fn test_remove_multiple_columns_before_timestamp() {
1687        let column_schemas = vec![
1688            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1689            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1690            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1691            ColumnSchema::new(
1692                "ts",
1693                ConcreteDataType::timestamp_millisecond_datatype(),
1694                false,
1695            )
1696            .with_time_index(true),
1697        ];
1698        let schema = Arc::new(
1699            SchemaBuilder::try_from(column_schemas)
1700                .unwrap()
1701                .version(123)
1702                .build()
1703                .unwrap(),
1704        );
1705        let meta = TableMetaBuilder::empty()
1706            .schema(schema.clone())
1707            .primary_key_indices(vec![1])
1708            .engine("engine")
1709            .next_column_id(4)
1710            .build()
1711            .unwrap();
1712
1713        // Remove columns in reverse order to test whether timestamp index is valid.
1714        let alter_kind = AlterKind::DropColumns {
1715            names: vec![String::from("col3"), String::from("col1")],
1716        };
1717        let new_meta = meta
1718            .builder_with_alter_kind("my_table", &alter_kind)
1719            .unwrap()
1720            .build()
1721            .unwrap();
1722
1723        let names: Vec<String> = new_meta
1724            .schema
1725            .column_schemas()
1726            .iter()
1727            .map(|column_schema| column_schema.name.clone())
1728            .collect();
1729        assert_eq!(&["col2", "ts"], &names[..]);
1730        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1731        assert_eq!(&[1], &new_meta.value_indices[..]);
1732        assert_eq!(
1733            schema.timestamp_column(),
1734            new_meta.schema.timestamp_column()
1735        );
1736    }
1737
1738    #[test]
1739    fn test_add_existing_column() {
1740        let schema = Arc::new(new_test_schema());
1741        let meta = TableMetaBuilder::empty()
1742            .schema(schema)
1743            .primary_key_indices(vec![0])
1744            .engine("engine")
1745            .next_column_id(3)
1746            .build()
1747            .unwrap();
1748
1749        let alter_kind = AlterKind::AddColumns {
1750            columns: vec![AddColumnRequest {
1751                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1752                is_key: false,
1753                location: None,
1754                add_if_not_exists: false,
1755            }],
1756        };
1757
1758        let err = meta
1759            .builder_with_alter_kind("my_table", &alter_kind)
1760            .err()
1761            .unwrap();
1762        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1763
1764        // Add if not exists
1765        let alter_kind = AlterKind::AddColumns {
1766            columns: vec![AddColumnRequest {
1767                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1768                is_key: true,
1769                location: None,
1770                add_if_not_exists: true,
1771            }],
1772        };
1773        let new_meta = meta
1774            .builder_with_alter_kind("my_table", &alter_kind)
1775            .unwrap()
1776            .build()
1777            .unwrap();
1778        assert_eq!(
1779            meta.schema.column_schemas(),
1780            new_meta.schema.column_schemas()
1781        );
1782        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1783    }
1784
1785    #[test]
1786    fn test_add_different_type_column() {
1787        let schema = Arc::new(new_test_schema());
1788        let meta = TableMetaBuilder::empty()
1789            .schema(schema)
1790            .primary_key_indices(vec![0])
1791            .engine("engine")
1792            .next_column_id(3)
1793            .build()
1794            .unwrap();
1795
1796        // Add if not exists, but different type.
1797        let alter_kind = AlterKind::AddColumns {
1798            columns: vec![AddColumnRequest {
1799                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1800                is_key: false,
1801                location: None,
1802                add_if_not_exists: true,
1803            }],
1804        };
1805        let err = meta
1806            .builder_with_alter_kind("my_table", &alter_kind)
1807            .err()
1808            .unwrap();
1809        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1810    }
1811
1812    #[test]
1813    fn test_add_invalid_column() {
1814        let schema = Arc::new(new_test_schema());
1815        let meta = TableMetaBuilder::empty()
1816            .schema(schema)
1817            .primary_key_indices(vec![0])
1818            .engine("engine")
1819            .next_column_id(3)
1820            .build()
1821            .unwrap();
1822
1823        // Not nullable and no default value.
1824        let alter_kind = AlterKind::AddColumns {
1825            columns: vec![AddColumnRequest {
1826                column_schema: ColumnSchema::new(
1827                    "weny",
1828                    ConcreteDataType::string_datatype(),
1829                    false,
1830                ),
1831                is_key: false,
1832                location: None,
1833                add_if_not_exists: false,
1834            }],
1835        };
1836
1837        let err = meta
1838            .builder_with_alter_kind("my_table", &alter_kind)
1839            .err()
1840            .unwrap();
1841        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1842    }
1843
1844    #[test]
1845    fn test_remove_unknown_column() {
1846        let schema = Arc::new(new_test_schema());
1847        let meta = TableMetaBuilder::empty()
1848            .schema(schema)
1849            .primary_key_indices(vec![0])
1850            .engine("engine")
1851            .next_column_id(3)
1852            .build()
1853            .unwrap();
1854
1855        let alter_kind = AlterKind::DropColumns {
1856            names: vec![String::from("unknown")],
1857        };
1858
1859        let err = meta
1860            .builder_with_alter_kind("my_table", &alter_kind)
1861            .err()
1862            .unwrap();
1863        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1864    }
1865
1866    #[test]
1867    fn test_change_unknown_column_data_type() {
1868        let schema = Arc::new(new_test_schema());
1869        let meta = TableMetaBuilder::empty()
1870            .schema(schema)
1871            .primary_key_indices(vec![0])
1872            .engine("engine")
1873            .next_column_id(3)
1874            .build()
1875            .unwrap();
1876
1877        let alter_kind = AlterKind::ModifyColumnTypes {
1878            columns: vec![ModifyColumnTypeRequest {
1879                column_name: "unknown".to_string(),
1880                target_type: ConcreteDataType::string_datatype(),
1881            }],
1882        };
1883
1884        let err = meta
1885            .builder_with_alter_kind("my_table", &alter_kind)
1886            .err()
1887            .unwrap();
1888        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1889    }
1890
1891    #[test]
1892    fn test_remove_key_column() {
1893        let schema = Arc::new(new_test_schema());
1894        let meta = TableMetaBuilder::empty()
1895            .schema(schema)
1896            .primary_key_indices(vec![0])
1897            .engine("engine")
1898            .next_column_id(3)
1899            .build()
1900            .unwrap();
1901
1902        // Remove column in primary key.
1903        let alter_kind = AlterKind::DropColumns {
1904            names: vec![String::from("col1")],
1905        };
1906
1907        let err = meta
1908            .builder_with_alter_kind("my_table", &alter_kind)
1909            .err()
1910            .unwrap();
1911        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1912
1913        // Remove timestamp column.
1914        let alter_kind = AlterKind::DropColumns {
1915            names: vec![String::from("ts")],
1916        };
1917
1918        let err = meta
1919            .builder_with_alter_kind("my_table", &alter_kind)
1920            .err()
1921            .unwrap();
1922        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1923    }
1924
1925    #[test]
1926    fn test_remove_partition_column() {
1927        let schema = Arc::new(new_test_schema());
1928        let meta = TableMetaBuilder::empty()
1929            .schema(schema)
1930            .primary_key_indices(vec![])
1931            .partition_key_indices(vec![0])
1932            .engine("engine")
1933            .next_column_id(3)
1934            .build()
1935            .unwrap();
1936        // Remove column in primary key.
1937        let alter_kind = AlterKind::DropColumns {
1938            names: vec![String::from("col1")],
1939        };
1940
1941        let err = meta
1942            .builder_with_alter_kind("my_table", &alter_kind)
1943            .err()
1944            .unwrap();
1945        assert_matches!(err, Error::RemovePartitionColumn { .. });
1946    }
1947
1948    #[test]
1949    fn test_change_key_column_data_type() {
1950        let schema = Arc::new(new_test_schema());
1951        let meta = TableMetaBuilder::empty()
1952            .schema(schema)
1953            .primary_key_indices(vec![0])
1954            .engine("engine")
1955            .next_column_id(3)
1956            .build()
1957            .unwrap();
1958
1959        // Remove column in primary key.
1960        let alter_kind = AlterKind::ModifyColumnTypes {
1961            columns: vec![ModifyColumnTypeRequest {
1962                column_name: "col1".to_string(),
1963                target_type: ConcreteDataType::string_datatype(),
1964            }],
1965        };
1966
1967        let err = meta
1968            .builder_with_alter_kind("my_table", &alter_kind)
1969            .err()
1970            .unwrap();
1971        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1972
1973        // Remove timestamp column.
1974        let alter_kind = AlterKind::ModifyColumnTypes {
1975            columns: vec![ModifyColumnTypeRequest {
1976                column_name: "ts".to_string(),
1977                target_type: ConcreteDataType::string_datatype(),
1978            }],
1979        };
1980
1981        let err = meta
1982            .builder_with_alter_kind("my_table", &alter_kind)
1983            .err()
1984            .unwrap();
1985        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1986    }
1987
1988    #[test]
1989    fn test_alloc_new_column() {
1990        let schema = Arc::new(new_test_schema());
1991        let mut meta = TableMetaBuilder::empty()
1992            .schema(schema)
1993            .primary_key_indices(vec![0])
1994            .engine("engine")
1995            .next_column_id(3)
1996            .build()
1997            .unwrap();
1998        assert_eq!(3, meta.next_column_id);
1999
2000        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
2001        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
2002
2003        assert_eq!(4, meta.next_column_id);
2004        assert_eq!(column_schema.name, desc.name);
2005    }
2006
2007    #[test]
2008    fn test_add_columns_with_location() {
2009        let schema = Arc::new(new_test_schema());
2010        let meta = TableMetaBuilder::empty()
2011            .schema(schema)
2012            .primary_key_indices(vec![0])
2013            // partition col: col1, col2
2014            .partition_key_indices(vec![0, 2])
2015            .engine("engine")
2016            .next_column_id(3)
2017            .build()
2018            .unwrap();
2019
2020        let new_meta = add_columns_to_meta_with_location(&meta);
2021        let names: Vec<String> = new_meta
2022            .schema
2023            .column_schemas()
2024            .iter()
2025            .map(|column_schema| column_schema.name.clone())
2026            .collect();
2027        assert_eq!(
2028            &[
2029                "my_tag_first",               // primary key column
2030                "col1",                       // partition column
2031                "ts",                         // timestamp column
2032                "yet_another_field_after_ts", // primary key column
2033                "my_field_after_ts",          // value column
2034                "col2",                       // partition column
2035            ],
2036            &names[..]
2037        );
2038        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
2039        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
2040        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
2041    }
2042
2043    #[test]
2044    fn test_modify_column_fulltext_options() {
2045        let schema = Arc::new(new_test_schema());
2046        let meta = TableMetaBuilder::empty()
2047            .schema(schema)
2048            .primary_key_indices(vec![0])
2049            .engine("engine")
2050            .next_column_id(3)
2051            .build()
2052            .unwrap();
2053
2054        let alter_kind = AlterKind::SetIndexes {
2055            options: vec![SetIndexOption::Fulltext {
2056                column_name: "col1".to_string(),
2057                options: FulltextOptions::default(),
2058            }],
2059        };
2060        let err = meta
2061            .builder_with_alter_kind("my_table", &alter_kind)
2062            .err()
2063            .unwrap();
2064        assert_eq!(
2065            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2066            err.to_string()
2067        );
2068
2069        // Add a string column and make it fulltext indexed
2070        let new_meta = add_columns_to_meta_with_location(&meta);
2071        let alter_kind = AlterKind::SetIndexes {
2072            options: vec![SetIndexOption::Fulltext {
2073                column_name: "my_tag_first".to_string(),
2074                options: FulltextOptions::new_unchecked(
2075                    true,
2076                    FulltextAnalyzer::Chinese,
2077                    true,
2078                    FulltextBackend::Bloom,
2079                    1000,
2080                    0.01,
2081                ),
2082            }],
2083        };
2084        let new_meta = new_meta
2085            .builder_with_alter_kind("my_table", &alter_kind)
2086            .unwrap()
2087            .build()
2088            .unwrap();
2089        let column_schema = new_meta
2090            .schema
2091            .column_schema_by_name("my_tag_first")
2092            .unwrap();
2093        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2094        assert!(fulltext_options.enable);
2095        assert_eq!(
2096            datatypes::schema::FulltextAnalyzer::Chinese,
2097            fulltext_options.analyzer
2098        );
2099        assert!(fulltext_options.case_sensitive);
2100
2101        let alter_kind = AlterKind::UnsetIndexes {
2102            options: vec![UnsetIndexOption::Fulltext {
2103                column_name: "my_tag_first".to_string(),
2104            }],
2105        };
2106        let new_meta = new_meta
2107            .builder_with_alter_kind("my_table", &alter_kind)
2108            .unwrap()
2109            .build()
2110            .unwrap();
2111        let column_schema = new_meta
2112            .schema
2113            .column_schema_by_name("my_tag_first")
2114            .unwrap();
2115        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2116        assert!(!fulltext_options.enable);
2117    }
2118}