table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetDefaultRequest, SetIndexOption,
39    TableOptions, UnsetIndexOption,
40};
41use crate::table_reference::TableReference;
42
43pub type TableId = u32;
44pub type TableVersion = u64;
45
46/// Indicates whether and how a filter expression can be handled by a
47/// Table for table scans.
48#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
49pub enum FilterPushDownType {
50    /// The expression cannot be used by the provider.
51    Unsupported,
52    /// The expression can be used to help minimise the data retrieved,
53    /// but the provider cannot guarantee that all returned tuples
54    /// satisfy the filter. The Filter plan node containing this expression
55    /// will be preserved.
56    Inexact,
57    /// The provider guarantees that all returned data satisfies this
58    /// filter expression. The Filter plan node containing this expression
59    /// will be removed.
60    Exact,
61}
62
63impl From<TableProviderFilterPushDown> for FilterPushDownType {
64    fn from(value: TableProviderFilterPushDown) -> Self {
65        match value {
66            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
67            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
68            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
69        }
70    }
71}
72
73impl From<FilterPushDownType> for TableProviderFilterPushDown {
74    fn from(value: FilterPushDownType) -> Self {
75        match value {
76            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
77            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
78            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
79        }
80    }
81}
82
83/// Indicates the type of this table for metadata/catalog purposes.
84#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
85pub enum TableType {
86    /// An ordinary physical table.
87    Base,
88    /// A non-materialised table that itself uses a query internally to provide data.
89    View,
90    /// A transient table.
91    Temporary,
92}
93
94impl std::fmt::Display for TableType {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            TableType::Base => f.write_str("BASE TABLE"),
98            TableType::Temporary => f.write_str("TEMPORARY"),
99            TableType::View => f.write_str("VIEW"),
100        }
101    }
102}
103
104/// Identifier of the table.
105#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
106pub struct TableIdent {
107    /// Unique id of this table.
108    pub table_id: TableId,
109    /// Version of the table, bumped when metadata (such as schema) of the table
110    /// being changed.
111    pub version: TableVersion,
112}
113
114/// The table metadata.
115///
116/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
117#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
118#[builder(pattern = "mutable", custom_constructor)]
119pub struct TableMeta {
120    pub schema: SchemaRef,
121    /// The indices of columns in primary key. Note that the index of timestamp column
122    /// is not included in these indices.
123    pub primary_key_indices: Vec<usize>,
124    #[builder(default = "self.default_value_indices()?")]
125    pub value_indices: Vec<usize>,
126    #[builder(default, setter(into))]
127    pub engine: String,
128    #[builder(default, setter(into))]
129    pub region_numbers: Vec<u32>,
130    pub next_column_id: ColumnId,
131    /// Table options.
132    #[builder(default)]
133    pub options: TableOptions,
134    #[builder(default = "Utc::now()")]
135    pub created_on: DateTime<Utc>,
136    #[builder(default = "Vec::new()")]
137    pub partition_key_indices: Vec<usize>,
138    #[builder(default = "Vec::new()")]
139    pub column_ids: Vec<ColumnId>,
140}
141
142impl TableMetaBuilder {
143    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
144    #[cfg(any(test, feature = "testing"))]
145    pub fn empty() -> Self {
146        Self {
147            schema: None,
148            primary_key_indices: None,
149            value_indices: None,
150            engine: None,
151            region_numbers: None,
152            next_column_id: None,
153            options: None,
154            created_on: None,
155            partition_key_indices: None,
156            column_ids: None,
157        }
158    }
159}
160
161impl TableMetaBuilder {
162    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
163        match (&self.primary_key_indices, &self.schema) {
164            (Some(v), Some(schema)) => {
165                let column_schemas = schema.column_schemas();
166                Ok((0..column_schemas.len())
167                    .filter(|idx| !v.contains(idx))
168                    .collect())
169            }
170            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
171        }
172    }
173
174    pub fn new_external_table() -> Self {
175        Self {
176            schema: None,
177            primary_key_indices: Some(Vec::new()),
178            value_indices: Some(Vec::new()),
179            engine: None,
180            region_numbers: Some(Vec::new()),
181            next_column_id: Some(0),
182            options: None,
183            created_on: None,
184            partition_key_indices: None,
185            column_ids: None,
186        }
187    }
188}
189
190/// The result after splitting requests by column location info.
191struct SplitResult<'a> {
192    /// column requests should be added at first place.
193    columns_at_first: Vec<&'a AddColumnRequest>,
194    /// column requests should be added after already exist columns.
195    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
196    /// column requests should be added at last place.
197    columns_at_last: Vec<&'a AddColumnRequest>,
198    /// all column names should be added.
199    column_names: Vec<String>,
200}
201
202impl TableMeta {
203    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
204        let columns_schemas = &self.schema.column_schemas();
205        self.primary_key_indices
206            .iter()
207            .map(|idx| &columns_schemas[*idx].name)
208    }
209
210    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
211        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
212        let columns_schemas = self.schema.column_schemas();
213        let primary_key_indices = &self.primary_key_indices;
214        columns_schemas
215            .iter()
216            .enumerate()
217            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
218            .map(|(_, cs)| &cs.name)
219    }
220
221    pub fn partition_column_names(&self) -> impl Iterator<Item = &String> {
222        let columns_schemas = &self.schema.column_schemas();
223        self.partition_key_indices
224            .iter()
225            .map(|idx| &columns_schemas[*idx].name)
226    }
227
228    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
229    ///
230    /// The returned builder would derive the next column id of this meta.
231    pub fn builder_with_alter_kind(
232        &self,
233        table_name: &str,
234        alter_kind: &AlterKind,
235    ) -> Result<TableMetaBuilder> {
236        match alter_kind {
237            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
238            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
239            AlterKind::ModifyColumnTypes { columns } => {
240                self.modify_column_types(table_name, columns)
241            }
242            // No need to rebuild table meta when renaming tables.
243            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
244            AlterKind::SetTableOptions { options } => self.set_table_options(options),
245            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
246            AlterKind::SetIndexes { options } => self.set_indexes(table_name, options),
247            AlterKind::UnsetIndexes { options } => self.unset_indexes(table_name, options),
248            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
249            AlterKind::SetDefaults { defaults } => self.set_defaults(table_name, defaults),
250        }
251    }
252
253    /// Creates a [TableMetaBuilder] with modified table options.
254    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
255        let mut new_options = self.options.clone();
256
257        for request in requests {
258            match request {
259                SetRegionOption::Ttl(new_ttl) => {
260                    new_options.ttl = *new_ttl;
261                }
262                SetRegionOption::Twsc(key, value) => {
263                    if !value.is_empty() {
264                        new_options
265                            .extra_options
266                            .insert(key.to_string(), value.to_string());
267                        // Ensure node restart correctly.
268                        new_options.extra_options.insert(
269                            COMPACTION_TYPE.to_string(),
270                            COMPACTION_TYPE_TWCS.to_string(),
271                        );
272                    } else {
273                        // Invalidate the previous change option if an empty value has been set.
274                        new_options.extra_options.remove(key.as_str());
275                    }
276                }
277            }
278        }
279        let mut builder = self.new_meta_builder();
280        builder.options(new_options);
281
282        Ok(builder)
283    }
284
285    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
286        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
287        self.set_table_options(&requests)
288    }
289
290    fn set_indexes(
291        &self,
292        table_name: &str,
293        requests: &[SetIndexOption],
294    ) -> Result<TableMetaBuilder> {
295        let table_schema = &self.schema;
296        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
297        for request in requests {
298            let column_name = request.column_name();
299            table_schema
300                .column_index_by_name(column_name)
301                .with_context(|| error::ColumnNotExistsSnafu {
302                    column_name,
303                    table_name,
304                })?;
305            set_index_options
306                .entry(column_name)
307                .or_default()
308                .push(request);
309        }
310
311        let mut meta_builder = self.new_meta_builder();
312        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
313        for mut column in table_schema.column_schemas().iter().cloned() {
314            if let Some(request) = set_index_options.get(column.name.as_str()) {
315                for request in request {
316                    self.set_index(&mut column, request)?;
317                }
318            }
319            columns.push(column);
320        }
321
322        let mut builder = SchemaBuilder::try_from_columns(columns)
323            .with_context(|_| error::SchemaBuildSnafu {
324                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
325            })?
326            .version(table_schema.version() + 1);
327
328        for (k, v) in table_schema.metadata().iter() {
329            builder = builder.add_metadata(k, v);
330        }
331
332        let new_schema = builder.build().with_context(|_| {
333            let column_names = requests
334                .iter()
335                .map(|request| request.column_name())
336                .collect::<Vec<_>>();
337            error::SchemaBuildSnafu {
338                msg: format!(
339                    "Table {table_name} cannot set index options with columns {column_names:?}",
340                ),
341            }
342        })?;
343        let _ = meta_builder
344            .schema(Arc::new(new_schema))
345            .primary_key_indices(self.primary_key_indices.clone());
346
347        Ok(meta_builder)
348    }
349
350    fn unset_indexes(
351        &self,
352        table_name: &str,
353        requests: &[UnsetIndexOption],
354    ) -> Result<TableMetaBuilder> {
355        let table_schema = &self.schema;
356        let mut set_index_options: HashMap<&str, Vec<_>> = HashMap::new();
357        for request in requests {
358            let column_name = request.column_name();
359            table_schema
360                .column_index_by_name(column_name)
361                .with_context(|| error::ColumnNotExistsSnafu {
362                    column_name,
363                    table_name,
364                })?;
365            set_index_options
366                .entry(column_name)
367                .or_default()
368                .push(request);
369        }
370
371        let mut meta_builder = self.new_meta_builder();
372        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
373        for mut column in table_schema.column_schemas().iter().cloned() {
374            if let Some(request) = set_index_options.get(column.name.as_str()) {
375                for request in request {
376                    self.unset_index(&mut column, request)?;
377                }
378            }
379            columns.push(column);
380        }
381
382        let mut builder = SchemaBuilder::try_from_columns(columns)
383            .with_context(|_| error::SchemaBuildSnafu {
384                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
385            })?
386            .version(table_schema.version() + 1);
387
388        for (k, v) in table_schema.metadata().iter() {
389            builder = builder.add_metadata(k, v);
390        }
391
392        let new_schema = builder.build().with_context(|_| {
393            let column_names = requests
394                .iter()
395                .map(|request| request.column_name())
396                .collect::<Vec<_>>();
397            error::SchemaBuildSnafu {
398                msg: format!(
399                    "Table {table_name} cannot set index options with columns {column_names:?}",
400                ),
401            }
402        })?;
403        let _ = meta_builder
404            .schema(Arc::new(new_schema))
405            .primary_key_indices(self.primary_key_indices.clone());
406
407        Ok(meta_builder)
408    }
409
410    fn set_index(&self, column_schema: &mut ColumnSchema, request: &SetIndexOption) -> Result<()> {
411        match request {
412            SetIndexOption::Fulltext {
413                column_name,
414                options,
415            } => {
416                ensure!(
417                    column_schema.data_type.is_string(),
418                    error::InvalidColumnOptionSnafu {
419                        column_name,
420                        msg: "FULLTEXT index only supports string type",
421                    }
422                );
423
424                let current_fulltext_options = column_schema
425                    .fulltext_options()
426                    .context(error::SetFulltextOptionsSnafu { column_name })?;
427                set_column_fulltext_options(
428                    column_schema,
429                    column_name,
430                    options,
431                    current_fulltext_options,
432                )?;
433            }
434            SetIndexOption::Inverted { column_name } => {
435                debug_assert_eq!(column_schema.name, *column_name);
436                column_schema.set_inverted_index(true);
437            }
438            SetIndexOption::Skipping {
439                column_name,
440                options,
441            } => {
442                set_column_skipping_index_options(column_schema, column_name, options)?;
443            }
444        }
445
446        Ok(())
447    }
448
449    fn unset_index(
450        &self,
451        column_schema: &mut ColumnSchema,
452        request: &UnsetIndexOption,
453    ) -> Result<()> {
454        match request {
455            UnsetIndexOption::Fulltext { column_name } => {
456                let current_fulltext_options = column_schema
457                    .fulltext_options()
458                    .context(error::SetFulltextOptionsSnafu { column_name })?;
459                unset_column_fulltext_options(
460                    column_schema,
461                    column_name,
462                    current_fulltext_options.clone(),
463                )?
464            }
465            UnsetIndexOption::Inverted { .. } => {
466                column_schema.set_inverted_index(false);
467            }
468            UnsetIndexOption::Skipping { column_name } => {
469                unset_column_skipping_index_options(column_schema, column_name)?;
470            }
471        }
472
473        Ok(())
474    }
475
476    // TODO(yingwen): Remove this.
477    /// Allocate a new column for the table.
478    ///
479    /// This method would bump the `next_column_id` of the meta.
480    pub fn alloc_new_column(
481        &mut self,
482        table_name: &str,
483        new_column: &ColumnSchema,
484    ) -> Result<ColumnDescriptor> {
485        let desc = ColumnDescriptorBuilder::new(
486            self.next_column_id as ColumnId,
487            &new_column.name,
488            new_column.data_type.clone(),
489        )
490        .is_nullable(new_column.is_nullable())
491        .default_constraint(new_column.default_constraint().cloned())
492        .build()
493        .context(error::BuildColumnDescriptorSnafu {
494            table_name,
495            column_name: &new_column.name,
496        })?;
497
498        // Bump next column id.
499        self.next_column_id += 1;
500
501        Ok(desc)
502    }
503
504    /// Create a [`TableMetaBuilder`] from the current TableMeta.
505    fn new_meta_builder(&self) -> TableMetaBuilder {
506        let mut builder = TableMetaBuilder::from(self);
507        // Manually remove value_indices.
508        builder.value_indices = None;
509        builder
510    }
511
512    // TODO(yingwen): Tests add if not exists.
513    fn add_columns(
514        &self,
515        table_name: &str,
516        requests: &[AddColumnRequest],
517    ) -> Result<TableMetaBuilder> {
518        let table_schema = &self.schema;
519        let mut meta_builder = self.new_meta_builder();
520        let original_primary_key_indices: HashSet<&usize> =
521            self.primary_key_indices.iter().collect();
522
523        let mut names = HashSet::with_capacity(requests.len());
524        let mut new_columns = Vec::with_capacity(requests.len());
525        for col_to_add in requests {
526            if let Some(column_schema) =
527                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
528            {
529                // If the column already exists.
530                ensure!(
531                    col_to_add.add_if_not_exists,
532                    error::ColumnExistsSnafu {
533                        table_name,
534                        column_name: &col_to_add.column_schema.name
535                    },
536                );
537
538                // Checks if the type is the same
539                ensure!(
540                    column_schema.data_type == col_to_add.column_schema.data_type,
541                    error::InvalidAlterRequestSnafu {
542                        table: table_name,
543                        err: format!(
544                            "column {} already exists with different type {:?}",
545                            col_to_add.column_schema.name, column_schema.data_type,
546                        ),
547                    }
548                );
549            } else {
550                // A new column.
551                // Ensures we only add a column once.
552                ensure!(
553                    names.insert(&col_to_add.column_schema.name),
554                    error::InvalidAlterRequestSnafu {
555                        table: table_name,
556                        err: format!(
557                            "add column {} more than once",
558                            col_to_add.column_schema.name
559                        ),
560                    }
561                );
562
563                ensure!(
564                    col_to_add.column_schema.is_nullable()
565                        || col_to_add.column_schema.default_constraint().is_some(),
566                    error::InvalidAlterRequestSnafu {
567                        table: table_name,
568                        err: format!(
569                            "no default value for column {}",
570                            col_to_add.column_schema.name
571                        ),
572                    },
573                );
574
575                new_columns.push(col_to_add.clone());
576            }
577        }
578        let requests = &new_columns[..];
579
580        let SplitResult {
581            columns_at_first,
582            columns_at_after,
583            columns_at_last,
584            column_names,
585        } = self.split_requests_by_column_location(table_name, requests)?;
586        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
587        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
588        // add new columns with FIRST, and in reverse order of requests.
589        columns_at_first.iter().rev().for_each(|request| {
590            if request.is_key {
591                // If a key column is added, we also need to store its index in primary_key_indices.
592                primary_key_indices.push(columns.len());
593            }
594            columns.push(request.column_schema.clone());
595        });
596        // add existed columns in original order and handle new columns with AFTER.
597        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
598            if original_primary_key_indices.contains(&index) {
599                primary_key_indices.push(columns.len());
600            }
601            columns.push(column_schema.clone());
602            if let Some(requests) = columns_at_after.get(&column_schema.name) {
603                requests.iter().rev().for_each(|request| {
604                    if request.is_key {
605                        // If a key column is added, we also need to store its index in primary_key_indices.
606                        primary_key_indices.push(columns.len());
607                    }
608                    columns.push(request.column_schema.clone());
609                });
610            }
611        }
612        // add new columns without location info to last.
613        columns_at_last.iter().for_each(|request| {
614            if request.is_key {
615                // If a key column is added, we also need to store its index in primary_key_indices.
616                primary_key_indices.push(columns.len());
617            }
618            columns.push(request.column_schema.clone());
619        });
620
621        let mut builder = SchemaBuilder::try_from(columns)
622            .with_context(|_| error::SchemaBuildSnafu {
623                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
624            })?
625            // Also bump the schema version.
626            .version(table_schema.version() + 1);
627        for (k, v) in table_schema.metadata().iter() {
628            builder = builder.add_metadata(k, v);
629        }
630        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
631            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
632        })?;
633
634        let partition_key_indices = self
635            .partition_key_indices
636            .iter()
637            .map(|idx| table_schema.column_name_by_index(*idx))
638            // This unwrap is safe since we only add new columns.
639            .map(|name| new_schema.column_index_by_name(name).unwrap())
640            .collect();
641
642        // value_indices would be generated automatically.
643        let _ = meta_builder
644            .schema(Arc::new(new_schema))
645            .primary_key_indices(primary_key_indices)
646            .partition_key_indices(partition_key_indices);
647
648        Ok(meta_builder)
649    }
650
651    fn remove_columns(
652        &self,
653        table_name: &str,
654        column_names: &[String],
655    ) -> Result<TableMetaBuilder> {
656        let table_schema = &self.schema;
657        let column_names: HashSet<_> = column_names.iter().collect();
658        let mut meta_builder = self.new_meta_builder();
659
660        let timestamp_index = table_schema.timestamp_index();
661        // Check whether columns are existing and not in primary key index.
662        for column_name in &column_names {
663            if let Some(index) = table_schema.column_index_by_name(column_name) {
664                // This is a linear search, but since there won't be too much columns, the performance should
665                // be acceptable.
666                ensure!(
667                    !self.primary_key_indices.contains(&index),
668                    error::RemoveColumnInIndexSnafu {
669                        column_name: *column_name,
670                        table_name,
671                    }
672                );
673
674                ensure!(
675                    !self.partition_key_indices.contains(&index),
676                    error::RemovePartitionColumnSnafu {
677                        column_name: *column_name,
678                        table_name,
679                    }
680                );
681
682                if let Some(ts_index) = timestamp_index {
683                    // Not allowed to remove column in timestamp index.
684                    ensure!(
685                        index != ts_index,
686                        error::RemoveColumnInIndexSnafu {
687                            column_name: table_schema.column_name_by_index(ts_index),
688                            table_name,
689                        }
690                    );
691                }
692            } else {
693                return error::ColumnNotExistsSnafu {
694                    column_name: *column_name,
695                    table_name,
696                }
697                .fail()?;
698            }
699        }
700
701        // Collect columns after removal.
702        let columns: Vec<_> = table_schema
703            .column_schemas()
704            .iter()
705            .filter(|column_schema| !column_names.contains(&column_schema.name))
706            .cloned()
707            .collect();
708
709        let mut builder = SchemaBuilder::try_from_columns(columns)
710            .with_context(|_| error::SchemaBuildSnafu {
711                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
712            })?
713            // Also bump the schema version.
714            .version(table_schema.version() + 1);
715        for (k, v) in table_schema.metadata().iter() {
716            builder = builder.add_metadata(k, v);
717        }
718        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
719            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
720        })?;
721
722        // Rebuild the indices of primary key columns.
723        let primary_key_indices = self
724            .primary_key_indices
725            .iter()
726            .map(|idx| table_schema.column_name_by_index(*idx))
727            // This unwrap is safe since we don't allow removing a primary key column.
728            .map(|name| new_schema.column_index_by_name(name).unwrap())
729            .collect();
730
731        let partition_key_indices = self
732            .partition_key_indices
733            .iter()
734            .map(|idx| table_schema.column_name_by_index(*idx))
735            // This unwrap is safe since we don't allow removing a partition key column.
736            .map(|name| new_schema.column_index_by_name(name).unwrap())
737            .collect();
738
739        let _ = meta_builder
740            .schema(Arc::new(new_schema))
741            .primary_key_indices(primary_key_indices)
742            .partition_key_indices(partition_key_indices);
743
744        Ok(meta_builder)
745    }
746
747    fn modify_column_types(
748        &self,
749        table_name: &str,
750        requests: &[ModifyColumnTypeRequest],
751    ) -> Result<TableMetaBuilder> {
752        let table_schema = &self.schema;
753        let mut meta_builder = self.new_meta_builder();
754
755        let mut modify_column_types = HashMap::with_capacity(requests.len());
756        let timestamp_index = table_schema.timestamp_index();
757
758        for col_to_change in requests {
759            let change_column_name = &col_to_change.column_name;
760
761            let index = table_schema
762                .column_index_by_name(change_column_name)
763                .with_context(|| error::ColumnNotExistsSnafu {
764                    column_name: change_column_name,
765                    table_name,
766                })?;
767
768            let column = &table_schema.column_schemas()[index];
769
770            ensure!(
771                !self.primary_key_indices.contains(&index),
772                error::InvalidAlterRequestSnafu {
773                    table: table_name,
774                    err: format!(
775                        "Not allowed to change primary key index column '{}'",
776                        column.name
777                    )
778                }
779            );
780
781            if let Some(ts_index) = timestamp_index {
782                // Not allowed to change column datatype in timestamp index.
783                ensure!(
784                    index != ts_index,
785                    error::InvalidAlterRequestSnafu {
786                        table: table_name,
787                        err: format!(
788                            "Not allowed to change timestamp index column '{}' datatype",
789                            column.name
790                        )
791                    }
792                );
793            }
794
795            ensure!(
796                modify_column_types
797                    .insert(&col_to_change.column_name, col_to_change)
798                    .is_none(),
799                error::InvalidAlterRequestSnafu {
800                    table: table_name,
801                    err: format!(
802                        "change column datatype {} more than once",
803                        col_to_change.column_name
804                    ),
805                }
806            );
807
808            ensure!(
809                column
810                    .data_type
811                    .can_arrow_type_cast_to(&col_to_change.target_type),
812                error::InvalidAlterRequestSnafu {
813                    table: table_name,
814                    err: format!(
815                        "column '{}' cannot be cast automatically to type '{}'",
816                        col_to_change.column_name, col_to_change.target_type,
817                    ),
818                }
819            );
820
821            ensure!(
822                column.is_nullable(),
823                error::InvalidAlterRequestSnafu {
824                    table: table_name,
825                    err: format!(
826                        "column '{}' must be nullable to ensure safe conversion.",
827                        col_to_change.column_name,
828                    ),
829                }
830            );
831        }
832        // Collect columns after changed.
833
834        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
835        for mut column in table_schema.column_schemas().iter().cloned() {
836            if let Some(change_column) = modify_column_types.get(&column.name) {
837                column.data_type = change_column.target_type.clone();
838                let new_default = if let Some(default_value) = column.default_constraint() {
839                    Some(
840                        default_value
841                            .cast_to_datatype(&change_column.target_type)
842                            .with_context(|_| error::CastDefaultValueSnafu {
843                                reason: format!(
844                                    "Failed to cast default value from {:?} to type {:?}",
845                                    default_value, &change_column.target_type
846                                ),
847                            })?,
848                    )
849                } else {
850                    None
851                };
852                column = column
853                    .clone()
854                    .with_default_constraint(new_default.clone())
855                    .with_context(|_| error::CastDefaultValueSnafu {
856                        reason: format!("Failed to set new default: {:?}", new_default),
857                    })?;
858            }
859            columns.push(column)
860        }
861
862        let mut builder = SchemaBuilder::try_from_columns(columns)
863            .with_context(|_| error::SchemaBuildSnafu {
864                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
865            })?
866            // Also bump the schema version.
867            .version(table_schema.version() + 1);
868        for (k, v) in table_schema.metadata().iter() {
869            builder = builder.add_metadata(k, v);
870        }
871        let new_schema = builder.build().with_context(|_| {
872            let column_names: Vec<_> = requests
873                .iter()
874                .map(|request| &request.column_name)
875                .collect();
876
877            error::SchemaBuildSnafu {
878                msg: format!(
879                    "Table {table_name} cannot change datatype with columns {column_names:?}"
880                ),
881            }
882        })?;
883
884        let _ = meta_builder
885            .schema(Arc::new(new_schema))
886            .primary_key_indices(self.primary_key_indices.clone());
887
888        Ok(meta_builder)
889    }
890
891    /// Split requests into different groups using column location info.
892    fn split_requests_by_column_location<'a>(
893        &self,
894        table_name: &str,
895        requests: &'a [AddColumnRequest],
896    ) -> Result<SplitResult<'a>> {
897        let table_schema = &self.schema;
898        let mut columns_at_first = Vec::new();
899        let mut columns_at_after = HashMap::new();
900        let mut columns_at_last = Vec::new();
901        let mut column_names = Vec::with_capacity(requests.len());
902        for request in requests {
903            // Check whether columns to add are already existing.
904            let column_name = &request.column_schema.name;
905            column_names.push(column_name.clone());
906            ensure!(
907                table_schema.column_schema_by_name(column_name).is_none(),
908                error::ColumnExistsSnafu {
909                    column_name,
910                    table_name,
911                }
912            );
913            match request.location.as_ref() {
914                Some(AddColumnLocation::First) => {
915                    columns_at_first.push(request);
916                }
917                Some(AddColumnLocation::After { column_name }) => {
918                    ensure!(
919                        table_schema.column_schema_by_name(column_name).is_some(),
920                        error::ColumnNotExistsSnafu {
921                            column_name,
922                            table_name,
923                        }
924                    );
925                    columns_at_after
926                        .entry(column_name.clone())
927                        .or_insert(Vec::new())
928                        .push(request);
929                }
930                None => {
931                    columns_at_last.push(request);
932                }
933            }
934        }
935        Ok(SplitResult {
936            columns_at_first,
937            columns_at_after,
938            columns_at_last,
939            column_names,
940        })
941    }
942
943    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
944        let table_schema = &self.schema;
945        let mut meta_builder = self.new_meta_builder();
946        let mut columns = Vec::with_capacity(table_schema.num_columns());
947        for column_schema in table_schema.column_schemas() {
948            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
949                // Drop default constraint.
950                ensure!(
951                    column_schema.default_constraint().is_some(),
952                    error::InvalidAlterRequestSnafu {
953                        table: table_name,
954                        err: format!("column {name} does not have a default value"),
955                    }
956                );
957                if !column_schema.is_nullable() {
958                    return error::InvalidAlterRequestSnafu {
959                        table: table_name,
960                        err: format!(
961                            "column {name} is not nullable and `default` cannot be dropped",
962                        ),
963                    }
964                    .fail();
965                }
966                let new_column_schema = column_schema.clone();
967                let new_column_schema = new_column_schema
968                    .with_default_constraint(None)
969                    .with_context(|_| error::SchemaBuildSnafu {
970                        msg: format!("Table {table_name} cannot drop default values"),
971                    })?;
972                columns.push(new_column_schema);
973            } else {
974                columns.push(column_schema.clone());
975            }
976        }
977
978        let mut builder = SchemaBuilder::try_from_columns(columns)
979            .with_context(|_| error::SchemaBuildSnafu {
980                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
981            })?
982            // Also bump the schema version.
983            .version(table_schema.version() + 1);
984        for (k, v) in table_schema.metadata().iter() {
985            builder = builder.add_metadata(k, v);
986        }
987        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
988            msg: format!("Table {table_name} cannot drop default values"),
989        })?;
990
991        let _ = meta_builder.schema(Arc::new(new_schema));
992
993        Ok(meta_builder)
994    }
995
996    fn set_defaults(
997        &self,
998        table_name: &str,
999        set_defaults: &[SetDefaultRequest],
1000    ) -> Result<TableMetaBuilder> {
1001        let table_schema = &self.schema;
1002        let mut meta_builder = self.new_meta_builder();
1003        let mut columns = Vec::with_capacity(table_schema.num_columns());
1004        for column_schema in table_schema.column_schemas() {
1005            if let Some(set_default) = set_defaults
1006                .iter()
1007                .find(|s| s.column_name == column_schema.name)
1008            {
1009                let new_column_schema = column_schema.clone();
1010                let new_column_schema = new_column_schema
1011                    .with_default_constraint(set_default.default_constraint.clone())
1012                    .with_context(|_| error::SchemaBuildSnafu {
1013                        msg: format!("Table {table_name} cannot set default values"),
1014                    })?;
1015                columns.push(new_column_schema);
1016            } else {
1017                columns.push(column_schema.clone());
1018            }
1019        }
1020
1021        let mut builder = SchemaBuilder::try_from_columns(columns)
1022            .with_context(|_| error::SchemaBuildSnafu {
1023                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
1024            })?
1025            // Also bump the schema version.
1026            .version(table_schema.version() + 1);
1027        for (k, v) in table_schema.metadata().iter() {
1028            builder = builder.add_metadata(k, v);
1029        }
1030        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
1031            msg: format!("Table {table_name} cannot set default values"),
1032        })?;
1033
1034        let _ = meta_builder.schema(Arc::new(new_schema));
1035
1036        Ok(meta_builder)
1037    }
1038}
1039
1040#[derive(Clone, Debug, PartialEq, Eq, Builder)]
1041#[builder(pattern = "owned")]
1042pub struct TableInfo {
1043    /// Id and version of the table.
1044    #[builder(default, setter(into))]
1045    pub ident: TableIdent,
1046    /// Name of the table.
1047    #[builder(setter(into))]
1048    pub name: String,
1049    /// Comment of the table.
1050    #[builder(default, setter(into))]
1051    pub desc: Option<String>,
1052    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
1053    pub catalog_name: String,
1054    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
1055    pub schema_name: String,
1056    pub meta: TableMeta,
1057    #[builder(default = "TableType::Base")]
1058    pub table_type: TableType,
1059}
1060
1061pub type TableInfoRef = Arc<TableInfo>;
1062
1063impl TableInfo {
1064    pub fn table_id(&self) -> TableId {
1065        self.ident.table_id
1066    }
1067
1068    pub fn region_ids(&self) -> Vec<RegionId> {
1069        self.meta
1070            .region_numbers
1071            .iter()
1072            .map(|id| RegionId::new(self.table_id(), *id))
1073            .collect()
1074    }
1075    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1076    pub fn full_table_name(&self) -> String {
1077        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1078    }
1079
1080    pub fn get_db_string(&self) -> String {
1081        common_catalog::build_db_string(&self.catalog_name, &self.schema_name)
1082    }
1083
1084    /// Returns true when the table is the metric engine's physical table.
1085    pub fn is_physical_table(&self) -> bool {
1086        self.meta
1087            .options
1088            .extra_options
1089            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1090    }
1091
1092    /// Return true if the table's TTL is `instant`.
1093    pub fn is_ttl_instant_table(&self) -> bool {
1094        self.meta
1095            .options
1096            .ttl
1097            .map(|t| t.is_instant())
1098            .unwrap_or(false)
1099    }
1100}
1101
1102impl TableInfoBuilder {
1103    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1104        Self {
1105            name: Some(name.into()),
1106            meta: Some(meta),
1107            ..Default::default()
1108        }
1109    }
1110
1111    pub fn table_id(mut self, id: TableId) -> Self {
1112        let ident = self.ident.get_or_insert_with(TableIdent::default);
1113        ident.table_id = id;
1114        self
1115    }
1116
1117    pub fn table_version(mut self, version: TableVersion) -> Self {
1118        let ident = self.ident.get_or_insert_with(TableIdent::default);
1119        ident.version = version;
1120        self
1121    }
1122}
1123
1124impl TableIdent {
1125    pub fn new(table_id: TableId) -> Self {
1126        Self {
1127            table_id,
1128            version: 0,
1129        }
1130    }
1131}
1132
1133impl From<TableId> for TableIdent {
1134    fn from(table_id: TableId) -> Self {
1135        Self::new(table_id)
1136    }
1137}
1138
1139/// Struct used to serialize and deserialize [`TableMeta`].
1140#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1141pub struct RawTableMeta {
1142    pub schema: RawSchema,
1143    /// The indices of columns in primary key. Note that the index of timestamp column
1144    /// is not included. Order matters to this array.
1145    pub primary_key_indices: Vec<usize>,
1146    ///  The indices of columns in value. The index of timestamp column is included.
1147    /// Order doesn't matter to this array.
1148    pub value_indices: Vec<usize>,
1149    /// Engine type of this table. Usually in small case.
1150    pub engine: String,
1151    /// Next column id of a new column.
1152    /// It's used to ensure all columns with the same name across all regions have the same column id.
1153    pub next_column_id: ColumnId,
1154    pub region_numbers: Vec<u32>,
1155    pub options: TableOptions,
1156    pub created_on: DateTime<Utc>,
1157    /// Order doesn't matter to this array.
1158    #[serde(default)]
1159    pub partition_key_indices: Vec<usize>,
1160    /// Map of column name to column id.
1161    /// Note: This field may be empty for older versions that did not include this field.
1162    #[serde(default)]
1163    pub column_ids: Vec<ColumnId>,
1164}
1165
1166impl From<TableMeta> for RawTableMeta {
1167    fn from(meta: TableMeta) -> RawTableMeta {
1168        RawTableMeta {
1169            schema: RawSchema::from(&*meta.schema),
1170            primary_key_indices: meta.primary_key_indices,
1171            value_indices: meta.value_indices,
1172            engine: meta.engine,
1173            next_column_id: meta.next_column_id,
1174            region_numbers: meta.region_numbers,
1175            options: meta.options,
1176            created_on: meta.created_on,
1177            partition_key_indices: meta.partition_key_indices,
1178            column_ids: meta.column_ids,
1179        }
1180    }
1181}
1182
1183impl TryFrom<RawTableMeta> for TableMeta {
1184    type Error = ConvertError;
1185
1186    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1187        Ok(TableMeta {
1188            schema: Arc::new(Schema::try_from(raw.schema)?),
1189            primary_key_indices: raw.primary_key_indices,
1190            value_indices: raw.value_indices,
1191            engine: raw.engine,
1192            region_numbers: raw.region_numbers,
1193            next_column_id: raw.next_column_id,
1194            options: raw.options,
1195            created_on: raw.created_on,
1196            partition_key_indices: raw.partition_key_indices,
1197            column_ids: raw.column_ids,
1198        })
1199    }
1200}
1201
1202/// Struct used to serialize and deserialize [`TableInfo`].
1203#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1204pub struct RawTableInfo {
1205    pub ident: TableIdent,
1206    pub name: String,
1207    pub desc: Option<String>,
1208    pub catalog_name: String,
1209    pub schema_name: String,
1210    pub meta: RawTableMeta,
1211    pub table_type: TableType,
1212}
1213
1214impl RawTableInfo {
1215    /// Returns the map of column name to column id.
1216    ///
1217    /// Note: This method may return an empty map for older versions that did not include this field.
1218    pub fn name_to_ids(&self) -> Option<HashMap<String, ColumnId>> {
1219        if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() {
1220            None
1221        } else {
1222            Some(
1223                self.meta
1224                    .column_ids
1225                    .iter()
1226                    .enumerate()
1227                    .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id))
1228                    .collect(),
1229            )
1230        }
1231    }
1232
1233    /// Sort the columns in [RawTableInfo], logical tables require it.
1234    pub fn sort_columns(&mut self) {
1235        let column_schemas = &self.meta.schema.column_schemas;
1236        let primary_keys = self
1237            .meta
1238            .primary_key_indices
1239            .iter()
1240            .map(|index| column_schemas[*index].name.clone())
1241            .collect::<HashSet<_>>();
1242
1243        let name_to_ids = self.name_to_ids().unwrap_or_default();
1244        self.meta
1245            .schema
1246            .column_schemas
1247            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1248
1249        // Compute new indices of sorted columns
1250        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1251        let mut timestamp_index = None;
1252        let mut value_indices =
1253            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
1254        let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
1255        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1256            if primary_keys.contains(&column_schema.name) {
1257                primary_key_indices.push(index);
1258            } else if column_schema.is_time_index() {
1259                value_indices.push(index);
1260                timestamp_index = Some(index);
1261            } else {
1262                value_indices.push(index);
1263            }
1264            if let Some(id) = name_to_ids.get(&column_schema.name) {
1265                column_ids.push(*id);
1266            }
1267        }
1268
1269        // Overwrite table meta
1270        self.meta.schema.timestamp_index = timestamp_index;
1271        self.meta.primary_key_indices = primary_key_indices;
1272        self.meta.value_indices = value_indices;
1273        self.meta.column_ids = column_ids;
1274    }
1275
1276    /// Extracts region options from table info.
1277    ///
1278    /// All "region options" are actually a copy of table options for redundancy.
1279    pub fn to_region_options(&self) -> HashMap<String, String> {
1280        HashMap::from(&self.meta.options)
1281    }
1282
1283    /// Returns the table reference.
1284    pub fn table_ref(&self) -> TableReference {
1285        TableReference::full(
1286            self.catalog_name.as_str(),
1287            self.schema_name.as_str(),
1288            self.name.as_str(),
1289        )
1290    }
1291}
1292
1293impl From<TableInfo> for RawTableInfo {
1294    fn from(info: TableInfo) -> RawTableInfo {
1295        RawTableInfo {
1296            ident: info.ident,
1297            name: info.name,
1298            desc: info.desc,
1299            catalog_name: info.catalog_name,
1300            schema_name: info.schema_name,
1301            meta: RawTableMeta::from(info.meta),
1302            table_type: info.table_type,
1303        }
1304    }
1305}
1306
1307impl TryFrom<RawTableInfo> for TableInfo {
1308    type Error = ConvertError;
1309
1310    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1311        Ok(TableInfo {
1312            ident: raw.ident,
1313            name: raw.name,
1314            desc: raw.desc,
1315            catalog_name: raw.catalog_name,
1316            schema_name: raw.schema_name,
1317            meta: TableMeta::try_from(raw.meta)?,
1318            table_type: raw.table_type,
1319        })
1320    }
1321}
1322
1323/// Set column fulltext options if it passed the validation.
1324///
1325/// Options allowed to modify:
1326/// * backend
1327///
1328/// Options not allowed to modify:
1329/// * analyzer
1330/// * case_sensitive
1331fn set_column_fulltext_options(
1332    column_schema: &mut ColumnSchema,
1333    column_name: &str,
1334    options: &FulltextOptions,
1335    current_options: Option<FulltextOptions>,
1336) -> Result<()> {
1337    if let Some(current_options) = current_options {
1338        ensure!(
1339            current_options.analyzer == options.analyzer
1340                && current_options.case_sensitive == options.case_sensitive,
1341            error::InvalidColumnOptionSnafu {
1342                column_name,
1343                msg: format!(
1344                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1345                    current_options.analyzer, current_options.case_sensitive
1346                ),
1347            }
1348        );
1349    }
1350
1351    column_schema
1352        .set_fulltext_options(options)
1353        .context(error::SetFulltextOptionsSnafu { column_name })?;
1354
1355    Ok(())
1356}
1357
1358fn unset_column_fulltext_options(
1359    column_schema: &mut ColumnSchema,
1360    column_name: &str,
1361    current_options: Option<FulltextOptions>,
1362) -> Result<()> {
1363    ensure!(
1364        current_options
1365            .as_ref()
1366            .is_some_and(|options| options.enable),
1367        error::InvalidColumnOptionSnafu {
1368            column_name,
1369            msg: "FULLTEXT index already disabled".to_string(),
1370        }
1371    );
1372
1373    let mut options = current_options.unwrap();
1374    options.enable = false;
1375    column_schema
1376        .set_fulltext_options(&options)
1377        .context(error::SetFulltextOptionsSnafu { column_name })?;
1378
1379    Ok(())
1380}
1381
1382fn set_column_skipping_index_options(
1383    column_schema: &mut ColumnSchema,
1384    column_name: &str,
1385    options: &SkippingIndexOptions,
1386) -> Result<()> {
1387    column_schema
1388        .set_skipping_options(options)
1389        .context(error::SetSkippingOptionsSnafu { column_name })?;
1390
1391    Ok(())
1392}
1393
1394fn unset_column_skipping_index_options(
1395    column_schema: &mut ColumnSchema,
1396    column_name: &str,
1397) -> Result<()> {
1398    column_schema
1399        .unset_skipping_options()
1400        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1401    Ok(())
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406    use std::assert_matches::assert_matches;
1407
1408    use common_error::ext::ErrorExt;
1409    use common_error::status_code::StatusCode;
1410    use datatypes::data_type::ConcreteDataType;
1411    use datatypes::schema::{
1412        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1413    };
1414
1415    use super::*;
1416    use crate::Error;
1417
1418    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1419    fn new_test_schema() -> Schema {
1420        let column_schemas = vec![
1421            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1422            ColumnSchema::new(
1423                "ts",
1424                ConcreteDataType::timestamp_millisecond_datatype(),
1425                false,
1426            )
1427            .with_time_index(true),
1428            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1429        ];
1430        SchemaBuilder::try_from(column_schemas)
1431            .unwrap()
1432            .version(123)
1433            .build()
1434            .unwrap()
1435    }
1436
1437    #[test]
1438    fn test_raw_convert() {
1439        let schema = Arc::new(new_test_schema());
1440        let meta = TableMetaBuilder::empty()
1441            .schema(schema)
1442            .primary_key_indices(vec![0])
1443            .engine("engine")
1444            .next_column_id(3)
1445            .build()
1446            .unwrap();
1447        let info = TableInfoBuilder::default()
1448            .table_id(10)
1449            .table_version(5)
1450            .name("mytable")
1451            .meta(meta)
1452            .build()
1453            .unwrap();
1454
1455        let raw = RawTableInfo::from(info.clone());
1456        let info_new = TableInfo::try_from(raw).unwrap();
1457
1458        assert_eq!(info, info_new);
1459    }
1460
1461    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1462        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1463        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1464        let alter_kind = AlterKind::AddColumns {
1465            columns: vec![
1466                AddColumnRequest {
1467                    column_schema: new_tag,
1468                    is_key: true,
1469                    location: None,
1470                    add_if_not_exists: false,
1471                },
1472                AddColumnRequest {
1473                    column_schema: new_field,
1474                    is_key: false,
1475                    location: None,
1476                    add_if_not_exists: false,
1477                },
1478            ],
1479        };
1480
1481        let builder = meta
1482            .builder_with_alter_kind("my_table", &alter_kind)
1483            .unwrap();
1484        builder.build().unwrap()
1485    }
1486
1487    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1488        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1489        let new_field = ColumnSchema::new(
1490            "my_field_after_ts",
1491            ConcreteDataType::string_datatype(),
1492            true,
1493        );
1494        let yet_another_field = ColumnSchema::new(
1495            "yet_another_field_after_ts",
1496            ConcreteDataType::int64_datatype(),
1497            true,
1498        );
1499        let alter_kind = AlterKind::AddColumns {
1500            columns: vec![
1501                AddColumnRequest {
1502                    column_schema: new_tag,
1503                    is_key: true,
1504                    location: Some(AddColumnLocation::First),
1505                    add_if_not_exists: false,
1506                },
1507                AddColumnRequest {
1508                    column_schema: new_field,
1509                    is_key: false,
1510                    location: Some(AddColumnLocation::After {
1511                        column_name: "ts".to_string(),
1512                    }),
1513                    add_if_not_exists: false,
1514                },
1515                AddColumnRequest {
1516                    column_schema: yet_another_field,
1517                    is_key: true,
1518                    location: Some(AddColumnLocation::After {
1519                        column_name: "ts".to_string(),
1520                    }),
1521                    add_if_not_exists: false,
1522                },
1523            ],
1524        };
1525
1526        let builder = meta
1527            .builder_with_alter_kind("my_table", &alter_kind)
1528            .unwrap();
1529        builder.build().unwrap()
1530    }
1531
1532    #[test]
1533    fn test_add_columns() {
1534        let schema = Arc::new(new_test_schema());
1535        let meta = TableMetaBuilder::empty()
1536            .schema(schema)
1537            .primary_key_indices(vec![0])
1538            .engine("engine")
1539            .next_column_id(3)
1540            .build()
1541            .unwrap();
1542
1543        let new_meta = add_columns_to_meta(&meta);
1544        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1545
1546        let names: Vec<String> = new_meta
1547            .schema
1548            .column_schemas()
1549            .iter()
1550            .map(|column_schema| column_schema.name.clone())
1551            .collect();
1552        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1553        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1554        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1555    }
1556
1557    #[test]
1558    fn test_add_columns_multiple_times() {
1559        let schema = Arc::new(new_test_schema());
1560        let meta = TableMetaBuilder::empty()
1561            .schema(schema)
1562            .primary_key_indices(vec![0])
1563            .engine("engine")
1564            .next_column_id(3)
1565            .build()
1566            .unwrap();
1567
1568        let alter_kind = AlterKind::AddColumns {
1569            columns: vec![
1570                AddColumnRequest {
1571                    column_schema: ColumnSchema::new(
1572                        "col3",
1573                        ConcreteDataType::int32_datatype(),
1574                        true,
1575                    ),
1576                    is_key: true,
1577                    location: None,
1578                    add_if_not_exists: true,
1579                },
1580                AddColumnRequest {
1581                    column_schema: ColumnSchema::new(
1582                        "col3",
1583                        ConcreteDataType::int32_datatype(),
1584                        true,
1585                    ),
1586                    is_key: true,
1587                    location: None,
1588                    add_if_not_exists: true,
1589                },
1590            ],
1591        };
1592        let err = meta
1593            .builder_with_alter_kind("my_table", &alter_kind)
1594            .err()
1595            .unwrap();
1596        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1597    }
1598
1599    #[test]
1600    fn test_remove_columns() {
1601        let schema = Arc::new(new_test_schema());
1602        let meta = TableMetaBuilder::empty()
1603            .schema(schema.clone())
1604            .primary_key_indices(vec![0])
1605            .engine("engine")
1606            .next_column_id(3)
1607            .build()
1608            .unwrap();
1609        // Add more columns so we have enough candidate columns to remove.
1610        let meta = add_columns_to_meta(&meta);
1611
1612        let alter_kind = AlterKind::DropColumns {
1613            names: vec![String::from("col2"), String::from("my_field")],
1614        };
1615        let new_meta = meta
1616            .builder_with_alter_kind("my_table", &alter_kind)
1617            .unwrap()
1618            .build()
1619            .unwrap();
1620
1621        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1622
1623        let names: Vec<String> = new_meta
1624            .schema
1625            .column_schemas()
1626            .iter()
1627            .map(|column_schema| column_schema.name.clone())
1628            .collect();
1629        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1630        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1631        assert_eq!(&[1], &new_meta.value_indices[..]);
1632        assert_eq!(
1633            schema.timestamp_column(),
1634            new_meta.schema.timestamp_column()
1635        );
1636    }
1637
1638    #[test]
1639    fn test_remove_multiple_columns_before_timestamp() {
1640        let column_schemas = vec![
1641            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1642            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1643            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1644            ColumnSchema::new(
1645                "ts",
1646                ConcreteDataType::timestamp_millisecond_datatype(),
1647                false,
1648            )
1649            .with_time_index(true),
1650        ];
1651        let schema = Arc::new(
1652            SchemaBuilder::try_from(column_schemas)
1653                .unwrap()
1654                .version(123)
1655                .build()
1656                .unwrap(),
1657        );
1658        let meta = TableMetaBuilder::empty()
1659            .schema(schema.clone())
1660            .primary_key_indices(vec![1])
1661            .engine("engine")
1662            .next_column_id(4)
1663            .build()
1664            .unwrap();
1665
1666        // Remove columns in reverse order to test whether timestamp index is valid.
1667        let alter_kind = AlterKind::DropColumns {
1668            names: vec![String::from("col3"), String::from("col1")],
1669        };
1670        let new_meta = meta
1671            .builder_with_alter_kind("my_table", &alter_kind)
1672            .unwrap()
1673            .build()
1674            .unwrap();
1675
1676        let names: Vec<String> = new_meta
1677            .schema
1678            .column_schemas()
1679            .iter()
1680            .map(|column_schema| column_schema.name.clone())
1681            .collect();
1682        assert_eq!(&["col2", "ts"], &names[..]);
1683        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1684        assert_eq!(&[1], &new_meta.value_indices[..]);
1685        assert_eq!(
1686            schema.timestamp_column(),
1687            new_meta.schema.timestamp_column()
1688        );
1689    }
1690
1691    #[test]
1692    fn test_add_existing_column() {
1693        let schema = Arc::new(new_test_schema());
1694        let meta = TableMetaBuilder::empty()
1695            .schema(schema)
1696            .primary_key_indices(vec![0])
1697            .engine("engine")
1698            .next_column_id(3)
1699            .build()
1700            .unwrap();
1701
1702        let alter_kind = AlterKind::AddColumns {
1703            columns: vec![AddColumnRequest {
1704                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1705                is_key: false,
1706                location: None,
1707                add_if_not_exists: false,
1708            }],
1709        };
1710
1711        let err = meta
1712            .builder_with_alter_kind("my_table", &alter_kind)
1713            .err()
1714            .unwrap();
1715        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1716
1717        // Add if not exists
1718        let alter_kind = AlterKind::AddColumns {
1719            columns: vec![AddColumnRequest {
1720                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1721                is_key: true,
1722                location: None,
1723                add_if_not_exists: true,
1724            }],
1725        };
1726        let new_meta = meta
1727            .builder_with_alter_kind("my_table", &alter_kind)
1728            .unwrap()
1729            .build()
1730            .unwrap();
1731        assert_eq!(
1732            meta.schema.column_schemas(),
1733            new_meta.schema.column_schemas()
1734        );
1735        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1736    }
1737
1738    #[test]
1739    fn test_add_different_type_column() {
1740        let schema = Arc::new(new_test_schema());
1741        let meta = TableMetaBuilder::empty()
1742            .schema(schema)
1743            .primary_key_indices(vec![0])
1744            .engine("engine")
1745            .next_column_id(3)
1746            .build()
1747            .unwrap();
1748
1749        // Add if not exists, but different type.
1750        let alter_kind = AlterKind::AddColumns {
1751            columns: vec![AddColumnRequest {
1752                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1753                is_key: false,
1754                location: None,
1755                add_if_not_exists: true,
1756            }],
1757        };
1758        let err = meta
1759            .builder_with_alter_kind("my_table", &alter_kind)
1760            .err()
1761            .unwrap();
1762        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1763    }
1764
1765    #[test]
1766    fn test_add_invalid_column() {
1767        let schema = Arc::new(new_test_schema());
1768        let meta = TableMetaBuilder::empty()
1769            .schema(schema)
1770            .primary_key_indices(vec![0])
1771            .engine("engine")
1772            .next_column_id(3)
1773            .build()
1774            .unwrap();
1775
1776        // Not nullable and no default value.
1777        let alter_kind = AlterKind::AddColumns {
1778            columns: vec![AddColumnRequest {
1779                column_schema: ColumnSchema::new(
1780                    "weny",
1781                    ConcreteDataType::string_datatype(),
1782                    false,
1783                ),
1784                is_key: false,
1785                location: None,
1786                add_if_not_exists: false,
1787            }],
1788        };
1789
1790        let err = meta
1791            .builder_with_alter_kind("my_table", &alter_kind)
1792            .err()
1793            .unwrap();
1794        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1795    }
1796
1797    #[test]
1798    fn test_remove_unknown_column() {
1799        let schema = Arc::new(new_test_schema());
1800        let meta = TableMetaBuilder::empty()
1801            .schema(schema)
1802            .primary_key_indices(vec![0])
1803            .engine("engine")
1804            .next_column_id(3)
1805            .build()
1806            .unwrap();
1807
1808        let alter_kind = AlterKind::DropColumns {
1809            names: vec![String::from("unknown")],
1810        };
1811
1812        let err = meta
1813            .builder_with_alter_kind("my_table", &alter_kind)
1814            .err()
1815            .unwrap();
1816        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1817    }
1818
1819    #[test]
1820    fn test_change_unknown_column_data_type() {
1821        let schema = Arc::new(new_test_schema());
1822        let meta = TableMetaBuilder::empty()
1823            .schema(schema)
1824            .primary_key_indices(vec![0])
1825            .engine("engine")
1826            .next_column_id(3)
1827            .build()
1828            .unwrap();
1829
1830        let alter_kind = AlterKind::ModifyColumnTypes {
1831            columns: vec![ModifyColumnTypeRequest {
1832                column_name: "unknown".to_string(),
1833                target_type: ConcreteDataType::string_datatype(),
1834            }],
1835        };
1836
1837        let err = meta
1838            .builder_with_alter_kind("my_table", &alter_kind)
1839            .err()
1840            .unwrap();
1841        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1842    }
1843
1844    #[test]
1845    fn test_remove_key_column() {
1846        let schema = Arc::new(new_test_schema());
1847        let meta = TableMetaBuilder::empty()
1848            .schema(schema)
1849            .primary_key_indices(vec![0])
1850            .engine("engine")
1851            .next_column_id(3)
1852            .build()
1853            .unwrap();
1854
1855        // Remove column in primary key.
1856        let alter_kind = AlterKind::DropColumns {
1857            names: vec![String::from("col1")],
1858        };
1859
1860        let err = meta
1861            .builder_with_alter_kind("my_table", &alter_kind)
1862            .err()
1863            .unwrap();
1864        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1865
1866        // Remove timestamp column.
1867        let alter_kind = AlterKind::DropColumns {
1868            names: vec![String::from("ts")],
1869        };
1870
1871        let err = meta
1872            .builder_with_alter_kind("my_table", &alter_kind)
1873            .err()
1874            .unwrap();
1875        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1876    }
1877
1878    #[test]
1879    fn test_remove_partition_column() {
1880        let schema = Arc::new(new_test_schema());
1881        let meta = TableMetaBuilder::empty()
1882            .schema(schema)
1883            .primary_key_indices(vec![])
1884            .partition_key_indices(vec![0])
1885            .engine("engine")
1886            .next_column_id(3)
1887            .build()
1888            .unwrap();
1889        // Remove column in primary key.
1890        let alter_kind = AlterKind::DropColumns {
1891            names: vec![String::from("col1")],
1892        };
1893
1894        let err = meta
1895            .builder_with_alter_kind("my_table", &alter_kind)
1896            .err()
1897            .unwrap();
1898        assert_matches!(err, Error::RemovePartitionColumn { .. });
1899    }
1900
1901    #[test]
1902    fn test_change_key_column_data_type() {
1903        let schema = Arc::new(new_test_schema());
1904        let meta = TableMetaBuilder::empty()
1905            .schema(schema)
1906            .primary_key_indices(vec![0])
1907            .engine("engine")
1908            .next_column_id(3)
1909            .build()
1910            .unwrap();
1911
1912        // Remove column in primary key.
1913        let alter_kind = AlterKind::ModifyColumnTypes {
1914            columns: vec![ModifyColumnTypeRequest {
1915                column_name: "col1".to_string(),
1916                target_type: ConcreteDataType::string_datatype(),
1917            }],
1918        };
1919
1920        let err = meta
1921            .builder_with_alter_kind("my_table", &alter_kind)
1922            .err()
1923            .unwrap();
1924        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1925
1926        // Remove timestamp column.
1927        let alter_kind = AlterKind::ModifyColumnTypes {
1928            columns: vec![ModifyColumnTypeRequest {
1929                column_name: "ts".to_string(),
1930                target_type: ConcreteDataType::string_datatype(),
1931            }],
1932        };
1933
1934        let err = meta
1935            .builder_with_alter_kind("my_table", &alter_kind)
1936            .err()
1937            .unwrap();
1938        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1939    }
1940
1941    #[test]
1942    fn test_alloc_new_column() {
1943        let schema = Arc::new(new_test_schema());
1944        let mut meta = TableMetaBuilder::empty()
1945            .schema(schema)
1946            .primary_key_indices(vec![0])
1947            .engine("engine")
1948            .next_column_id(3)
1949            .build()
1950            .unwrap();
1951        assert_eq!(3, meta.next_column_id);
1952
1953        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1954        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1955
1956        assert_eq!(4, meta.next_column_id);
1957        assert_eq!(column_schema.name, desc.name);
1958    }
1959
1960    #[test]
1961    fn test_add_columns_with_location() {
1962        let schema = Arc::new(new_test_schema());
1963        let meta = TableMetaBuilder::empty()
1964            .schema(schema)
1965            .primary_key_indices(vec![0])
1966            // partition col: col1, col2
1967            .partition_key_indices(vec![0, 2])
1968            .engine("engine")
1969            .next_column_id(3)
1970            .build()
1971            .unwrap();
1972
1973        let new_meta = add_columns_to_meta_with_location(&meta);
1974        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1975
1976        let names: Vec<String> = new_meta
1977            .schema
1978            .column_schemas()
1979            .iter()
1980            .map(|column_schema| column_schema.name.clone())
1981            .collect();
1982        assert_eq!(
1983            &[
1984                "my_tag_first",               // primary key column
1985                "col1",                       // partition column
1986                "ts",                         // timestamp column
1987                "yet_another_field_after_ts", // primary key column
1988                "my_field_after_ts",          // value column
1989                "col2",                       // partition column
1990            ],
1991            &names[..]
1992        );
1993        assert_eq!(&[0, 1, 3], &new_meta.primary_key_indices[..]);
1994        assert_eq!(&[2, 4, 5], &new_meta.value_indices[..]);
1995        assert_eq!(&[1, 5], &new_meta.partition_key_indices[..]);
1996    }
1997
1998    #[test]
1999    fn test_modify_column_fulltext_options() {
2000        let schema = Arc::new(new_test_schema());
2001        let meta = TableMetaBuilder::empty()
2002            .schema(schema)
2003            .primary_key_indices(vec![0])
2004            .engine("engine")
2005            .next_column_id(3)
2006            .build()
2007            .unwrap();
2008
2009        let alter_kind = AlterKind::SetIndexes {
2010            options: vec![SetIndexOption::Fulltext {
2011                column_name: "col1".to_string(),
2012                options: FulltextOptions::default(),
2013            }],
2014        };
2015        let err = meta
2016            .builder_with_alter_kind("my_table", &alter_kind)
2017            .err()
2018            .unwrap();
2019        assert_eq!(
2020            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
2021            err.to_string()
2022        );
2023
2024        // Add a string column and make it fulltext indexed
2025        let new_meta = add_columns_to_meta_with_location(&meta);
2026        assert_eq!(meta.region_numbers, new_meta.region_numbers);
2027
2028        let alter_kind = AlterKind::SetIndexes {
2029            options: vec![SetIndexOption::Fulltext {
2030                column_name: "my_tag_first".to_string(),
2031                options: FulltextOptions::new_unchecked(
2032                    true,
2033                    FulltextAnalyzer::Chinese,
2034                    true,
2035                    FulltextBackend::Bloom,
2036                    1000,
2037                    0.01,
2038                ),
2039            }],
2040        };
2041        let new_meta = new_meta
2042            .builder_with_alter_kind("my_table", &alter_kind)
2043            .unwrap()
2044            .build()
2045            .unwrap();
2046        let column_schema = new_meta
2047            .schema
2048            .column_schema_by_name("my_tag_first")
2049            .unwrap();
2050        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2051        assert!(fulltext_options.enable);
2052        assert_eq!(
2053            datatypes::schema::FulltextAnalyzer::Chinese,
2054            fulltext_options.analyzer
2055        );
2056        assert!(fulltext_options.case_sensitive);
2057
2058        let alter_kind = AlterKind::UnsetIndexes {
2059            options: vec![UnsetIndexOption::Fulltext {
2060                column_name: "my_tag_first".to_string(),
2061            }],
2062        };
2063        let new_meta = new_meta
2064            .builder_with_alter_kind("my_table", &alter_kind)
2065            .unwrap()
2066            .build()
2067            .unwrap();
2068        let column_schema = new_meta
2069            .schema
2070            .column_schema_by_name("my_tag_first")
2071            .unwrap();
2072        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
2073        assert!(!fulltext_options.enable);
2074    }
2075}