table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39    UnsetIndexOptions,
40};
41
42pub type TableId = u32;
43pub type TableVersion = u64;
44
45/// Indicates whether and how a filter expression can be handled by a
46/// Table for table scans.
47#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
48pub enum FilterPushDownType {
49    /// The expression cannot be used by the provider.
50    Unsupported,
51    /// The expression can be used to help minimise the data retrieved,
52    /// but the provider cannot guarantee that all returned tuples
53    /// satisfy the filter. The Filter plan node containing this expression
54    /// will be preserved.
55    Inexact,
56    /// The provider guarantees that all returned data satisfies this
57    /// filter expression. The Filter plan node containing this expression
58    /// will be removed.
59    Exact,
60}
61
62impl From<TableProviderFilterPushDown> for FilterPushDownType {
63    fn from(value: TableProviderFilterPushDown) -> Self {
64        match value {
65            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
66            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
67            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
68        }
69    }
70}
71
72impl From<FilterPushDownType> for TableProviderFilterPushDown {
73    fn from(value: FilterPushDownType) -> Self {
74        match value {
75            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
76            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
77            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
78        }
79    }
80}
81
82/// Indicates the type of this table for metadata/catalog purposes.
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
84pub enum TableType {
85    /// An ordinary physical table.
86    Base,
87    /// A non-materialised table that itself uses a query internally to provide data.
88    View,
89    /// A transient table.
90    Temporary,
91}
92
93impl std::fmt::Display for TableType {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            TableType::Base => f.write_str("BASE TABLE"),
97            TableType::Temporary => f.write_str("TEMPORARY"),
98            TableType::View => f.write_str("VIEW"),
99        }
100    }
101}
102
103/// Identifier of the table.
104#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
105pub struct TableIdent {
106    /// Unique id of this table.
107    pub table_id: TableId,
108    /// Version of the table, bumped when metadata (such as schema) of the table
109    /// being changed.
110    pub version: TableVersion,
111}
112
113/// The table metadata.
114///
115/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
116#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
117#[builder(pattern = "mutable", custom_constructor)]
118pub struct TableMeta {
119    pub schema: SchemaRef,
120    /// The indices of columns in primary key. Note that the index of timestamp column
121    /// is not included in these indices.
122    pub primary_key_indices: Vec<usize>,
123    #[builder(default = "self.default_value_indices()?")]
124    pub value_indices: Vec<usize>,
125    #[builder(default, setter(into))]
126    pub engine: String,
127    #[builder(default, setter(into))]
128    pub region_numbers: Vec<u32>,
129    pub next_column_id: ColumnId,
130    /// Table options.
131    #[builder(default)]
132    pub options: TableOptions,
133    #[builder(default = "Utc::now()")]
134    pub created_on: DateTime<Utc>,
135    #[builder(default = "Vec::new()")]
136    pub partition_key_indices: Vec<usize>,
137}
138
139impl TableMetaBuilder {
140    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
141    #[cfg(any(test, feature = "testing"))]
142    pub fn empty() -> Self {
143        Self {
144            schema: None,
145            primary_key_indices: None,
146            value_indices: None,
147            engine: None,
148            region_numbers: None,
149            next_column_id: None,
150            options: None,
151            created_on: None,
152            partition_key_indices: None,
153        }
154    }
155}
156
157impl TableMetaBuilder {
158    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
159        match (&self.primary_key_indices, &self.schema) {
160            (Some(v), Some(schema)) => {
161                let column_schemas = schema.column_schemas();
162                Ok((0..column_schemas.len())
163                    .filter(|idx| !v.contains(idx))
164                    .collect())
165            }
166            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
167        }
168    }
169
170    pub fn new_external_table() -> Self {
171        Self {
172            schema: None,
173            primary_key_indices: Some(Vec::new()),
174            value_indices: Some(Vec::new()),
175            engine: None,
176            region_numbers: Some(Vec::new()),
177            next_column_id: Some(0),
178            options: None,
179            created_on: None,
180            partition_key_indices: None,
181        }
182    }
183}
184
185/// The result after splitting requests by column location info.
186struct SplitResult<'a> {
187    /// column requests should be added at first place.
188    columns_at_first: Vec<&'a AddColumnRequest>,
189    /// column requests should be added after already exist columns.
190    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
191    /// column requests should be added at last place.
192    columns_at_last: Vec<&'a AddColumnRequest>,
193    /// all column names should be added.
194    column_names: Vec<String>,
195}
196
197impl TableMeta {
198    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
199        let columns_schemas = &self.schema.column_schemas();
200        self.primary_key_indices
201            .iter()
202            .map(|idx| &columns_schemas[*idx].name)
203    }
204
205    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
206        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
207        let columns_schemas = self.schema.column_schemas();
208        let primary_key_indices = &self.primary_key_indices;
209        columns_schemas
210            .iter()
211            .enumerate()
212            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
213            .map(|(_, cs)| &cs.name)
214    }
215
216    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
217    ///
218    /// The returned builder would derive the next column id of this meta.
219    pub fn builder_with_alter_kind(
220        &self,
221        table_name: &str,
222        alter_kind: &AlterKind,
223    ) -> Result<TableMetaBuilder> {
224        match alter_kind {
225            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
226            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
227            AlterKind::ModifyColumnTypes { columns } => {
228                self.modify_column_types(table_name, columns)
229            }
230            // No need to rebuild table meta when renaming tables.
231            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
232            AlterKind::SetTableOptions { options } => self.set_table_options(options),
233            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
234            AlterKind::SetIndex { options } => match options {
235                SetIndexOptions::Fulltext {
236                    column_name,
237                    options,
238                } => self.change_column_fulltext_options(
239                    table_name,
240                    column_name,
241                    true,
242                    Some(options),
243                ),
244                SetIndexOptions::Inverted { column_name } => {
245                    self.change_column_modify_inverted_index(table_name, column_name, true)
246                }
247                SetIndexOptions::Skipping {
248                    column_name,
249                    options,
250                } => self.change_column_skipping_index_options(
251                    table_name,
252                    column_name,
253                    Some(options),
254                ),
255            },
256            AlterKind::UnsetIndex { options } => match options {
257                UnsetIndexOptions::Fulltext { column_name } => {
258                    self.change_column_fulltext_options(table_name, column_name, false, None)
259                }
260                UnsetIndexOptions::Inverted { column_name } => {
261                    self.change_column_modify_inverted_index(table_name, column_name, false)
262                }
263                UnsetIndexOptions::Skipping { column_name } => {
264                    self.change_column_skipping_index_options(table_name, column_name, None)
265                }
266            },
267        }
268    }
269
270    /// Creates a [TableMetaBuilder] with modified table options.
271    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
272        let mut new_options = self.options.clone();
273
274        for request in requests {
275            match request {
276                SetRegionOption::Ttl(new_ttl) => {
277                    new_options.ttl = *new_ttl;
278                }
279                SetRegionOption::Twsc(key, value) => {
280                    if !value.is_empty() {
281                        new_options
282                            .extra_options
283                            .insert(key.to_string(), value.to_string());
284                        // Ensure node restart correctly.
285                        new_options.extra_options.insert(
286                            COMPACTION_TYPE.to_string(),
287                            COMPACTION_TYPE_TWCS.to_string(),
288                        );
289                    } else {
290                        // Invalidate the previous change option if an empty value has been set.
291                        new_options.extra_options.remove(key.as_str());
292                    }
293                }
294            }
295        }
296        let mut builder = self.new_meta_builder();
297        builder.options(new_options);
298
299        Ok(builder)
300    }
301
302    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
303        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
304        self.set_table_options(&requests)
305    }
306
307    /// Creates a [TableMetaBuilder] with modified column inverted index.
308    fn change_column_modify_inverted_index(
309        &self,
310        table_name: &str,
311        column_name: &str,
312        value: bool,
313    ) -> Result<TableMetaBuilder> {
314        let table_schema = &self.schema;
315        let mut meta_builder = self.new_meta_builder();
316
317        let mut columns: Vec<ColumnSchema> =
318            Vec::with_capacity(table_schema.column_schemas().len());
319
320        for column_schema in table_schema.column_schemas().iter() {
321            if column_schema.name == column_name {
322                let mut new_column_schema = column_schema.clone();
323                new_column_schema.set_inverted_index(value);
324                columns.push(new_column_schema);
325            } else {
326                columns.push(column_schema.clone());
327            }
328        }
329
330        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
331        let mut builder = SchemaBuilder::try_from_columns(columns)
332            .with_context(|_| error::SchemaBuildSnafu {
333                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
334            })?
335            .version(table_schema.version() + 1);
336
337        for (k, v) in table_schema.metadata().iter() {
338            builder = builder.add_metadata(k, v);
339        }
340
341        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
342            msg: format!(
343                "Table {table_name} cannot change fulltext options for column {column_name}",
344            ),
345        })?;
346
347        let _ = meta_builder
348            .schema(Arc::new(new_schema))
349            .primary_key_indices(self.primary_key_indices.clone());
350
351        Ok(meta_builder)
352    }
353
354    /// Creates a [TableMetaBuilder] with modified column fulltext options.
355    fn change_column_fulltext_options(
356        &self,
357        table_name: &str,
358        column_name: &str,
359        enable: bool,
360        options: Option<&FulltextOptions>,
361    ) -> Result<TableMetaBuilder> {
362        let table_schema = &self.schema;
363        let mut meta_builder = self.new_meta_builder();
364
365        let column = &table_schema
366            .column_schema_by_name(column_name)
367            .with_context(|| error::ColumnNotExistsSnafu {
368                column_name,
369                table_name,
370            })?;
371
372        ensure!(
373            column.data_type.is_string(),
374            error::InvalidColumnOptionSnafu {
375                column_name,
376                msg: "FULLTEXT index only supports string type",
377            }
378        );
379
380        let current_fulltext_options = column
381            .fulltext_options()
382            .context(error::SetFulltextOptionsSnafu { column_name })?;
383
384        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
385        for column_schema in table_schema.column_schemas() {
386            if column_schema.name == column_name {
387                let mut new_column_schema = column_schema.clone();
388                if enable {
389                    ensure!(
390                        options.is_some(),
391                        error::InvalidColumnOptionSnafu {
392                            column_name,
393                            msg: "FULLTEXT index options must be provided",
394                        }
395                    );
396                    set_column_fulltext_options(
397                        &mut new_column_schema,
398                        column_name,
399                        options.unwrap(),
400                        current_fulltext_options.clone(),
401                    )?
402                } else {
403                    unset_column_fulltext_options(
404                        &mut new_column_schema,
405                        column_name,
406                        current_fulltext_options.clone(),
407                    )?
408                }
409                columns.push(new_column_schema);
410            } else {
411                columns.push(column_schema.clone());
412            }
413        }
414
415        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
416        let mut builder = SchemaBuilder::try_from_columns(columns)
417            .with_context(|_| error::SchemaBuildSnafu {
418                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
419            })?
420            .version(table_schema.version() + 1);
421
422        for (k, v) in table_schema.metadata().iter() {
423            builder = builder.add_metadata(k, v);
424        }
425
426        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
427            msg: format!(
428                "Table {table_name} cannot change fulltext options for column {column_name}",
429            ),
430        })?;
431
432        let _ = meta_builder
433            .schema(Arc::new(new_schema))
434            .primary_key_indices(self.primary_key_indices.clone());
435
436        Ok(meta_builder)
437    }
438
439    /// Creates a [TableMetaBuilder] with modified column skipping index options.
440    fn change_column_skipping_index_options(
441        &self,
442        table_name: &str,
443        column_name: &str,
444        options: Option<&SkippingIndexOptions>,
445    ) -> Result<TableMetaBuilder> {
446        let table_schema = &self.schema;
447        let mut meta_builder = self.new_meta_builder();
448
449        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
450        for column_schema in table_schema.column_schemas() {
451            if column_schema.name == column_name {
452                let mut new_column_schema = column_schema.clone();
453                if let Some(options) = options {
454                    set_column_skipping_index_options(
455                        &mut new_column_schema,
456                        column_name,
457                        options,
458                    )?;
459                } else {
460                    unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
461                }
462                columns.push(new_column_schema);
463            } else {
464                columns.push(column_schema.clone());
465            }
466        }
467
468        let mut builder = SchemaBuilder::try_from_columns(columns)
469            .with_context(|_| error::SchemaBuildSnafu {
470                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
471            })?
472            .version(table_schema.version() + 1);
473
474        for (k, v) in table_schema.metadata().iter() {
475            builder = builder.add_metadata(k, v);
476        }
477
478        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
479            msg: format!("Failed to convert column schemas into schema for table {table_name}"),
480        })?;
481
482        let _ = meta_builder
483            .schema(Arc::new(new_schema))
484            .primary_key_indices(self.primary_key_indices.clone());
485
486        Ok(meta_builder)
487    }
488
489    // TODO(yingwen): Remove this.
490    /// Allocate a new column for the table.
491    ///
492    /// This method would bump the `next_column_id` of the meta.
493    pub fn alloc_new_column(
494        &mut self,
495        table_name: &str,
496        new_column: &ColumnSchema,
497    ) -> Result<ColumnDescriptor> {
498        let desc = ColumnDescriptorBuilder::new(
499            self.next_column_id as ColumnId,
500            &new_column.name,
501            new_column.data_type.clone(),
502        )
503        .is_nullable(new_column.is_nullable())
504        .default_constraint(new_column.default_constraint().cloned())
505        .build()
506        .context(error::BuildColumnDescriptorSnafu {
507            table_name,
508            column_name: &new_column.name,
509        })?;
510
511        // Bump next column id.
512        self.next_column_id += 1;
513
514        Ok(desc)
515    }
516
517    /// Create a [`TableMetaBuilder`] from the current TableMeta.
518    fn new_meta_builder(&self) -> TableMetaBuilder {
519        let mut builder = TableMetaBuilder::from(self);
520        // Manually remove value_indices.
521        builder.value_indices = None;
522        builder
523    }
524
525    // TODO(yingwen): Tests add if not exists.
526    fn add_columns(
527        &self,
528        table_name: &str,
529        requests: &[AddColumnRequest],
530    ) -> Result<TableMetaBuilder> {
531        let table_schema = &self.schema;
532        let mut meta_builder = self.new_meta_builder();
533        let original_primary_key_indices: HashSet<&usize> =
534            self.primary_key_indices.iter().collect();
535
536        let mut names = HashSet::with_capacity(requests.len());
537        let mut new_columns = Vec::with_capacity(requests.len());
538        for col_to_add in requests {
539            if let Some(column_schema) =
540                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
541            {
542                // If the column already exists.
543                ensure!(
544                    col_to_add.add_if_not_exists,
545                    error::ColumnExistsSnafu {
546                        table_name,
547                        column_name: &col_to_add.column_schema.name
548                    },
549                );
550
551                // Checks if the type is the same
552                ensure!(
553                    column_schema.data_type == col_to_add.column_schema.data_type,
554                    error::InvalidAlterRequestSnafu {
555                        table: table_name,
556                        err: format!(
557                            "column {} already exists with different type {:?}",
558                            col_to_add.column_schema.name, column_schema.data_type,
559                        ),
560                    }
561                );
562            } else {
563                // A new column.
564                // Ensures we only add a column once.
565                ensure!(
566                    names.insert(&col_to_add.column_schema.name),
567                    error::InvalidAlterRequestSnafu {
568                        table: table_name,
569                        err: format!(
570                            "add column {} more than once",
571                            col_to_add.column_schema.name
572                        ),
573                    }
574                );
575
576                ensure!(
577                    col_to_add.column_schema.is_nullable()
578                        || col_to_add.column_schema.default_constraint().is_some(),
579                    error::InvalidAlterRequestSnafu {
580                        table: table_name,
581                        err: format!(
582                            "no default value for column {}",
583                            col_to_add.column_schema.name
584                        ),
585                    },
586                );
587
588                new_columns.push(col_to_add.clone());
589            }
590        }
591        let requests = &new_columns[..];
592
593        let SplitResult {
594            columns_at_first,
595            columns_at_after,
596            columns_at_last,
597            column_names,
598        } = self.split_requests_by_column_location(table_name, requests)?;
599        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
600        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
601        // add new columns with FIRST, and in reverse order of requests.
602        columns_at_first.iter().rev().for_each(|request| {
603            if request.is_key {
604                // If a key column is added, we also need to store its index in primary_key_indices.
605                primary_key_indices.push(columns.len());
606            }
607            columns.push(request.column_schema.clone());
608        });
609        // add existed columns in original order and handle new columns with AFTER.
610        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
611            if original_primary_key_indices.contains(&index) {
612                primary_key_indices.push(columns.len());
613            }
614            columns.push(column_schema.clone());
615            if let Some(requests) = columns_at_after.get(&column_schema.name) {
616                requests.iter().rev().for_each(|request| {
617                    if request.is_key {
618                        // If a key column is added, we also need to store its index in primary_key_indices.
619                        primary_key_indices.push(columns.len());
620                    }
621                    columns.push(request.column_schema.clone());
622                });
623            }
624        }
625        // add new columns without location info to last.
626        columns_at_last.iter().for_each(|request| {
627            if request.is_key {
628                // If a key column is added, we also need to store its index in primary_key_indices.
629                primary_key_indices.push(columns.len());
630            }
631            columns.push(request.column_schema.clone());
632        });
633
634        let mut builder = SchemaBuilder::try_from(columns)
635            .with_context(|_| error::SchemaBuildSnafu {
636                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
637            })?
638            // Also bump the schema version.
639            .version(table_schema.version() + 1);
640        for (k, v) in table_schema.metadata().iter() {
641            builder = builder.add_metadata(k, v);
642        }
643        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
644            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
645        })?;
646
647        // value_indices would be generated automatically.
648        let _ = meta_builder
649            .schema(Arc::new(new_schema))
650            .primary_key_indices(primary_key_indices);
651
652        Ok(meta_builder)
653    }
654
655    fn remove_columns(
656        &self,
657        table_name: &str,
658        column_names: &[String],
659    ) -> Result<TableMetaBuilder> {
660        let table_schema = &self.schema;
661        let column_names: HashSet<_> = column_names.iter().collect();
662        let mut meta_builder = self.new_meta_builder();
663
664        let timestamp_index = table_schema.timestamp_index();
665        // Check whether columns are existing and not in primary key index.
666        for column_name in &column_names {
667            if let Some(index) = table_schema.column_index_by_name(column_name) {
668                // This is a linear search, but since there won't be too much columns, the performance should
669                // be acceptable.
670                ensure!(
671                    !self.primary_key_indices.contains(&index),
672                    error::RemoveColumnInIndexSnafu {
673                        column_name: *column_name,
674                        table_name,
675                    }
676                );
677
678                if let Some(ts_index) = timestamp_index {
679                    // Not allowed to remove column in timestamp index.
680                    ensure!(
681                        index != ts_index,
682                        error::RemoveColumnInIndexSnafu {
683                            column_name: table_schema.column_name_by_index(ts_index),
684                            table_name,
685                        }
686                    );
687                }
688            } else {
689                return error::ColumnNotExistsSnafu {
690                    column_name: *column_name,
691                    table_name,
692                }
693                .fail()?;
694            }
695        }
696
697        // Collect columns after removal.
698        let columns: Vec<_> = table_schema
699            .column_schemas()
700            .iter()
701            .filter(|column_schema| !column_names.contains(&column_schema.name))
702            .cloned()
703            .collect();
704
705        let mut builder = SchemaBuilder::try_from_columns(columns)
706            .with_context(|_| error::SchemaBuildSnafu {
707                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
708            })?
709            // Also bump the schema version.
710            .version(table_schema.version() + 1);
711        for (k, v) in table_schema.metadata().iter() {
712            builder = builder.add_metadata(k, v);
713        }
714        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
715            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
716        })?;
717
718        // Rebuild the indices of primary key columns.
719        let primary_key_indices = self
720            .primary_key_indices
721            .iter()
722            .map(|idx| table_schema.column_name_by_index(*idx))
723            // This unwrap is safe since we don't allow removing a primary key column.
724            .map(|name| new_schema.column_index_by_name(name).unwrap())
725            .collect();
726
727        let _ = meta_builder
728            .schema(Arc::new(new_schema))
729            .primary_key_indices(primary_key_indices);
730
731        Ok(meta_builder)
732    }
733
734    fn modify_column_types(
735        &self,
736        table_name: &str,
737        requests: &[ModifyColumnTypeRequest],
738    ) -> Result<TableMetaBuilder> {
739        let table_schema = &self.schema;
740        let mut meta_builder = self.new_meta_builder();
741
742        let mut modify_column_types = HashMap::with_capacity(requests.len());
743        let timestamp_index = table_schema.timestamp_index();
744
745        for col_to_change in requests {
746            let change_column_name = &col_to_change.column_name;
747
748            let index = table_schema
749                .column_index_by_name(change_column_name)
750                .with_context(|| error::ColumnNotExistsSnafu {
751                    column_name: change_column_name,
752                    table_name,
753                })?;
754
755            let column = &table_schema.column_schemas()[index];
756
757            ensure!(
758                !self.primary_key_indices.contains(&index),
759                error::InvalidAlterRequestSnafu {
760                    table: table_name,
761                    err: format!(
762                        "Not allowed to change primary key index column '{}'",
763                        column.name
764                    )
765                }
766            );
767
768            if let Some(ts_index) = timestamp_index {
769                // Not allowed to change column datatype in timestamp index.
770                ensure!(
771                    index != ts_index,
772                    error::InvalidAlterRequestSnafu {
773                        table: table_name,
774                        err: format!(
775                            "Not allowed to change timestamp index column '{}' datatype",
776                            column.name
777                        )
778                    }
779                );
780            }
781
782            ensure!(
783                modify_column_types
784                    .insert(&col_to_change.column_name, col_to_change)
785                    .is_none(),
786                error::InvalidAlterRequestSnafu {
787                    table: table_name,
788                    err: format!(
789                        "change column datatype {} more than once",
790                        col_to_change.column_name
791                    ),
792                }
793            );
794
795            ensure!(
796                column
797                    .data_type
798                    .can_arrow_type_cast_to(&col_to_change.target_type),
799                error::InvalidAlterRequestSnafu {
800                    table: table_name,
801                    err: format!(
802                        "column '{}' cannot be cast automatically to type '{}'",
803                        col_to_change.column_name, col_to_change.target_type,
804                    ),
805                }
806            );
807
808            ensure!(
809                column.is_nullable(),
810                error::InvalidAlterRequestSnafu {
811                    table: table_name,
812                    err: format!(
813                        "column '{}' must be nullable to ensure safe conversion.",
814                        col_to_change.column_name,
815                    ),
816                }
817            );
818        }
819        // Collect columns after changed.
820
821        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
822        for mut column in table_schema.column_schemas().iter().cloned() {
823            if let Some(change_column) = modify_column_types.get(&column.name) {
824                column.data_type = change_column.target_type.clone();
825                let new_default = if let Some(default_value) = column.default_constraint() {
826                    Some(
827                        default_value
828                            .cast_to_datatype(&change_column.target_type)
829                            .with_context(|_| error::CastDefaultValueSnafu {
830                                reason: format!(
831                                    "Failed to cast default value from {:?} to type {:?}",
832                                    default_value, &change_column.target_type
833                                ),
834                            })?,
835                    )
836                } else {
837                    None
838                };
839                column = column
840                    .clone()
841                    .with_default_constraint(new_default.clone())
842                    .with_context(|_| error::CastDefaultValueSnafu {
843                        reason: format!("Failed to set new default: {:?}", new_default),
844                    })?;
845            }
846            columns.push(column)
847        }
848
849        let mut builder = SchemaBuilder::try_from_columns(columns)
850            .with_context(|_| error::SchemaBuildSnafu {
851                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
852            })?
853            // Also bump the schema version.
854            .version(table_schema.version() + 1);
855        for (k, v) in table_schema.metadata().iter() {
856            builder = builder.add_metadata(k, v);
857        }
858        let new_schema = builder.build().with_context(|_| {
859            let column_names: Vec<_> = requests
860                .iter()
861                .map(|request| &request.column_name)
862                .collect();
863
864            error::SchemaBuildSnafu {
865                msg: format!(
866                    "Table {table_name} cannot change datatype with columns {column_names:?}"
867                ),
868            }
869        })?;
870
871        let _ = meta_builder
872            .schema(Arc::new(new_schema))
873            .primary_key_indices(self.primary_key_indices.clone());
874
875        Ok(meta_builder)
876    }
877
878    /// Split requests into different groups using column location info.
879    fn split_requests_by_column_location<'a>(
880        &self,
881        table_name: &str,
882        requests: &'a [AddColumnRequest],
883    ) -> Result<SplitResult<'a>> {
884        let table_schema = &self.schema;
885        let mut columns_at_first = Vec::new();
886        let mut columns_at_after = HashMap::new();
887        let mut columns_at_last = Vec::new();
888        let mut column_names = Vec::with_capacity(requests.len());
889        for request in requests {
890            // Check whether columns to add are already existing.
891            let column_name = &request.column_schema.name;
892            column_names.push(column_name.clone());
893            ensure!(
894                table_schema.column_schema_by_name(column_name).is_none(),
895                error::ColumnExistsSnafu {
896                    column_name,
897                    table_name,
898                }
899            );
900            match request.location.as_ref() {
901                Some(AddColumnLocation::First) => {
902                    columns_at_first.push(request);
903                }
904                Some(AddColumnLocation::After { column_name }) => {
905                    ensure!(
906                        table_schema.column_schema_by_name(column_name).is_some(),
907                        error::ColumnNotExistsSnafu {
908                            column_name,
909                            table_name,
910                        }
911                    );
912                    columns_at_after
913                        .entry(column_name.clone())
914                        .or_insert(Vec::new())
915                        .push(request);
916                }
917                None => {
918                    columns_at_last.push(request);
919                }
920            }
921        }
922        Ok(SplitResult {
923            columns_at_first,
924            columns_at_after,
925            columns_at_last,
926            column_names,
927        })
928    }
929}
930
931#[derive(Clone, Debug, PartialEq, Eq, Builder)]
932#[builder(pattern = "owned")]
933pub struct TableInfo {
934    /// Id and version of the table.
935    #[builder(default, setter(into))]
936    pub ident: TableIdent,
937    /// Name of the table.
938    #[builder(setter(into))]
939    pub name: String,
940    /// Comment of the table.
941    #[builder(default, setter(into))]
942    pub desc: Option<String>,
943    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
944    pub catalog_name: String,
945    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
946    pub schema_name: String,
947    pub meta: TableMeta,
948    #[builder(default = "TableType::Base")]
949    pub table_type: TableType,
950}
951
952pub type TableInfoRef = Arc<TableInfo>;
953
954impl TableInfo {
955    pub fn table_id(&self) -> TableId {
956        self.ident.table_id
957    }
958
959    pub fn region_ids(&self) -> Vec<RegionId> {
960        self.meta
961            .region_numbers
962            .iter()
963            .map(|id| RegionId::new(self.table_id(), *id))
964            .collect()
965    }
966    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
967    pub fn full_table_name(&self) -> String {
968        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
969    }
970
971    /// Returns true when the table is the metric engine's physical table.
972    pub fn is_physical_table(&self) -> bool {
973        self.meta
974            .options
975            .extra_options
976            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
977    }
978
979    /// Return true if the table's TTL is `instant`.
980    pub fn is_ttl_instant_table(&self) -> bool {
981        self.meta
982            .options
983            .ttl
984            .map(|t| t.is_instant())
985            .unwrap_or(false)
986    }
987}
988
989impl TableInfoBuilder {
990    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
991        Self {
992            name: Some(name.into()),
993            meta: Some(meta),
994            ..Default::default()
995        }
996    }
997
998    pub fn table_id(mut self, id: TableId) -> Self {
999        let ident = self.ident.get_or_insert_with(TableIdent::default);
1000        ident.table_id = id;
1001        self
1002    }
1003
1004    pub fn table_version(mut self, version: TableVersion) -> Self {
1005        let ident = self.ident.get_or_insert_with(TableIdent::default);
1006        ident.version = version;
1007        self
1008    }
1009}
1010
1011impl TableIdent {
1012    pub fn new(table_id: TableId) -> Self {
1013        Self {
1014            table_id,
1015            version: 0,
1016        }
1017    }
1018}
1019
1020impl From<TableId> for TableIdent {
1021    fn from(table_id: TableId) -> Self {
1022        Self::new(table_id)
1023    }
1024}
1025
1026/// Struct used to serialize and deserialize [`TableMeta`].
1027#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1028pub struct RawTableMeta {
1029    pub schema: RawSchema,
1030    /// The indices of columns in primary key. Note that the index of timestamp column
1031    /// is not included. Order matters to this array.
1032    pub primary_key_indices: Vec<usize>,
1033    ///  The indices of columns in value. Order doesn't matter to this array.
1034    pub value_indices: Vec<usize>,
1035    /// Engine type of this table. Usually in small case.
1036    pub engine: String,
1037    /// Next column id of a new column.
1038    /// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982
1039    pub next_column_id: ColumnId,
1040    pub region_numbers: Vec<u32>,
1041    pub options: TableOptions,
1042    pub created_on: DateTime<Utc>,
1043    /// Order doesn't matter to this array.
1044    #[serde(default)]
1045    pub partition_key_indices: Vec<usize>,
1046}
1047
1048impl From<TableMeta> for RawTableMeta {
1049    fn from(meta: TableMeta) -> RawTableMeta {
1050        RawTableMeta {
1051            schema: RawSchema::from(&*meta.schema),
1052            primary_key_indices: meta.primary_key_indices,
1053            value_indices: meta.value_indices,
1054            engine: meta.engine,
1055            next_column_id: meta.next_column_id,
1056            region_numbers: meta.region_numbers,
1057            options: meta.options,
1058            created_on: meta.created_on,
1059            partition_key_indices: meta.partition_key_indices,
1060        }
1061    }
1062}
1063
1064impl TryFrom<RawTableMeta> for TableMeta {
1065    type Error = ConvertError;
1066
1067    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1068        Ok(TableMeta {
1069            schema: Arc::new(Schema::try_from(raw.schema)?),
1070            primary_key_indices: raw.primary_key_indices,
1071            value_indices: raw.value_indices,
1072            engine: raw.engine,
1073            region_numbers: raw.region_numbers,
1074            next_column_id: raw.next_column_id,
1075            options: raw.options,
1076            created_on: raw.created_on,
1077            partition_key_indices: raw.partition_key_indices,
1078        })
1079    }
1080}
1081
1082/// Struct used to serialize and deserialize [`TableInfo`].
1083#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1084pub struct RawTableInfo {
1085    pub ident: TableIdent,
1086    pub name: String,
1087    pub desc: Option<String>,
1088    pub catalog_name: String,
1089    pub schema_name: String,
1090    pub meta: RawTableMeta,
1091    pub table_type: TableType,
1092}
1093
1094impl RawTableInfo {
1095    /// Sort the columns in [RawTableInfo], logical tables require it.
1096    pub fn sort_columns(&mut self) {
1097        let column_schemas = &self.meta.schema.column_schemas;
1098        let primary_keys = self
1099            .meta
1100            .primary_key_indices
1101            .iter()
1102            .map(|index| column_schemas[*index].name.clone())
1103            .collect::<HashSet<_>>();
1104
1105        self.meta
1106            .schema
1107            .column_schemas
1108            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1109
1110        // Compute new indices of sorted columns
1111        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1112        let mut timestamp_index = None;
1113        let mut value_indices =
1114            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
1115        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1116            if primary_keys.contains(&column_schema.name) {
1117                primary_key_indices.push(index);
1118            } else if column_schema.is_time_index() {
1119                timestamp_index = Some(index);
1120            } else {
1121                value_indices.push(index);
1122            }
1123        }
1124
1125        // Overwrite table meta
1126        self.meta.schema.timestamp_index = timestamp_index;
1127        self.meta.primary_key_indices = primary_key_indices;
1128        self.meta.value_indices = value_indices;
1129    }
1130
1131    /// Extracts region options from table info.
1132    ///
1133    /// All "region options" are actually a copy of table options for redundancy.
1134    pub fn to_region_options(&self) -> HashMap<String, String> {
1135        HashMap::from(&self.meta.options)
1136    }
1137}
1138
1139impl From<TableInfo> for RawTableInfo {
1140    fn from(info: TableInfo) -> RawTableInfo {
1141        RawTableInfo {
1142            ident: info.ident,
1143            name: info.name,
1144            desc: info.desc,
1145            catalog_name: info.catalog_name,
1146            schema_name: info.schema_name,
1147            meta: RawTableMeta::from(info.meta),
1148            table_type: info.table_type,
1149        }
1150    }
1151}
1152
1153impl TryFrom<RawTableInfo> for TableInfo {
1154    type Error = ConvertError;
1155
1156    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1157        Ok(TableInfo {
1158            ident: raw.ident,
1159            name: raw.name,
1160            desc: raw.desc,
1161            catalog_name: raw.catalog_name,
1162            schema_name: raw.schema_name,
1163            meta: TableMeta::try_from(raw.meta)?,
1164            table_type: raw.table_type,
1165        })
1166    }
1167}
1168
1169/// Set column fulltext options if it passed the validation.
1170///
1171/// Options allowed to modify:
1172/// * backend
1173///
1174/// Options not allowed to modify:
1175/// * analyzer
1176/// * case_sensitive
1177fn set_column_fulltext_options(
1178    column_schema: &mut ColumnSchema,
1179    column_name: &str,
1180    options: &FulltextOptions,
1181    current_options: Option<FulltextOptions>,
1182) -> Result<()> {
1183    if let Some(current_options) = current_options {
1184        ensure!(
1185            current_options.analyzer == options.analyzer
1186                && current_options.case_sensitive == options.case_sensitive,
1187            error::InvalidColumnOptionSnafu {
1188                column_name,
1189                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1190                current_options.analyzer, current_options.case_sensitive),
1191            }
1192        );
1193    }
1194
1195    column_schema
1196        .set_fulltext_options(options)
1197        .context(error::SetFulltextOptionsSnafu { column_name })?;
1198
1199    Ok(())
1200}
1201
1202fn unset_column_fulltext_options(
1203    column_schema: &mut ColumnSchema,
1204    column_name: &str,
1205    current_options: Option<FulltextOptions>,
1206) -> Result<()> {
1207    ensure!(
1208        current_options
1209            .as_ref()
1210            .is_some_and(|options| options.enable),
1211        error::InvalidColumnOptionSnafu {
1212            column_name,
1213            msg: "FULLTEXT index already disabled".to_string(),
1214        }
1215    );
1216
1217    let mut options = current_options.unwrap();
1218    options.enable = false;
1219    column_schema
1220        .set_fulltext_options(&options)
1221        .context(error::SetFulltextOptionsSnafu { column_name })?;
1222
1223    Ok(())
1224}
1225
1226fn set_column_skipping_index_options(
1227    column_schema: &mut ColumnSchema,
1228    column_name: &str,
1229    options: &SkippingIndexOptions,
1230) -> Result<()> {
1231    column_schema
1232        .set_skipping_options(options)
1233        .context(error::SetSkippingOptionsSnafu { column_name })?;
1234
1235    Ok(())
1236}
1237
1238fn unset_column_skipping_index_options(
1239    column_schema: &mut ColumnSchema,
1240    column_name: &str,
1241) -> Result<()> {
1242    column_schema
1243        .unset_skipping_options()
1244        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1245    Ok(())
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250    use common_error::ext::ErrorExt;
1251    use common_error::status_code::StatusCode;
1252    use datatypes::data_type::ConcreteDataType;
1253    use datatypes::schema::{
1254        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1255    };
1256
1257    use super::*;
1258
1259    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1260    fn new_test_schema() -> Schema {
1261        let column_schemas = vec![
1262            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1263            ColumnSchema::new(
1264                "ts",
1265                ConcreteDataType::timestamp_millisecond_datatype(),
1266                false,
1267            )
1268            .with_time_index(true),
1269            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1270        ];
1271        SchemaBuilder::try_from(column_schemas)
1272            .unwrap()
1273            .version(123)
1274            .build()
1275            .unwrap()
1276    }
1277
1278    #[test]
1279    fn test_raw_convert() {
1280        let schema = Arc::new(new_test_schema());
1281        let meta = TableMetaBuilder::empty()
1282            .schema(schema)
1283            .primary_key_indices(vec![0])
1284            .engine("engine")
1285            .next_column_id(3)
1286            .build()
1287            .unwrap();
1288        let info = TableInfoBuilder::default()
1289            .table_id(10)
1290            .table_version(5)
1291            .name("mytable")
1292            .meta(meta)
1293            .build()
1294            .unwrap();
1295
1296        let raw = RawTableInfo::from(info.clone());
1297        let info_new = TableInfo::try_from(raw).unwrap();
1298
1299        assert_eq!(info, info_new);
1300    }
1301
1302    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1303        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1304        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1305        let alter_kind = AlterKind::AddColumns {
1306            columns: vec![
1307                AddColumnRequest {
1308                    column_schema: new_tag,
1309                    is_key: true,
1310                    location: None,
1311                    add_if_not_exists: false,
1312                },
1313                AddColumnRequest {
1314                    column_schema: new_field,
1315                    is_key: false,
1316                    location: None,
1317                    add_if_not_exists: false,
1318                },
1319            ],
1320        };
1321
1322        let builder = meta
1323            .builder_with_alter_kind("my_table", &alter_kind)
1324            .unwrap();
1325        builder.build().unwrap()
1326    }
1327
1328    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1329        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1330        let new_field = ColumnSchema::new(
1331            "my_field_after_ts",
1332            ConcreteDataType::string_datatype(),
1333            true,
1334        );
1335        let alter_kind = AlterKind::AddColumns {
1336            columns: vec![
1337                AddColumnRequest {
1338                    column_schema: new_tag,
1339                    is_key: true,
1340                    location: Some(AddColumnLocation::First),
1341                    add_if_not_exists: false,
1342                },
1343                AddColumnRequest {
1344                    column_schema: new_field,
1345                    is_key: false,
1346                    location: Some(AddColumnLocation::After {
1347                        column_name: "ts".to_string(),
1348                    }),
1349                    add_if_not_exists: false,
1350                },
1351            ],
1352        };
1353
1354        let builder = meta
1355            .builder_with_alter_kind("my_table", &alter_kind)
1356            .unwrap();
1357        builder.build().unwrap()
1358    }
1359
1360    #[test]
1361    fn test_add_columns() {
1362        let schema = Arc::new(new_test_schema());
1363        let meta = TableMetaBuilder::empty()
1364            .schema(schema)
1365            .primary_key_indices(vec![0])
1366            .engine("engine")
1367            .next_column_id(3)
1368            .build()
1369            .unwrap();
1370
1371        let new_meta = add_columns_to_meta(&meta);
1372        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1373
1374        let names: Vec<String> = new_meta
1375            .schema
1376            .column_schemas()
1377            .iter()
1378            .map(|column_schema| column_schema.name.clone())
1379            .collect();
1380        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1381        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1382        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1383    }
1384
1385    #[test]
1386    fn test_add_columns_multiple_times() {
1387        let schema = Arc::new(new_test_schema());
1388        let meta = TableMetaBuilder::empty()
1389            .schema(schema)
1390            .primary_key_indices(vec![0])
1391            .engine("engine")
1392            .next_column_id(3)
1393            .build()
1394            .unwrap();
1395
1396        let alter_kind = AlterKind::AddColumns {
1397            columns: vec![
1398                AddColumnRequest {
1399                    column_schema: ColumnSchema::new(
1400                        "col3",
1401                        ConcreteDataType::int32_datatype(),
1402                        true,
1403                    ),
1404                    is_key: true,
1405                    location: None,
1406                    add_if_not_exists: true,
1407                },
1408                AddColumnRequest {
1409                    column_schema: ColumnSchema::new(
1410                        "col3",
1411                        ConcreteDataType::int32_datatype(),
1412                        true,
1413                    ),
1414                    is_key: true,
1415                    location: None,
1416                    add_if_not_exists: true,
1417                },
1418            ],
1419        };
1420        let err = meta
1421            .builder_with_alter_kind("my_table", &alter_kind)
1422            .err()
1423            .unwrap();
1424        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1425    }
1426
1427    #[test]
1428    fn test_remove_columns() {
1429        let schema = Arc::new(new_test_schema());
1430        let meta = TableMetaBuilder::empty()
1431            .schema(schema.clone())
1432            .primary_key_indices(vec![0])
1433            .engine("engine")
1434            .next_column_id(3)
1435            .build()
1436            .unwrap();
1437        // Add more columns so we have enough candidate columns to remove.
1438        let meta = add_columns_to_meta(&meta);
1439
1440        let alter_kind = AlterKind::DropColumns {
1441            names: vec![String::from("col2"), String::from("my_field")],
1442        };
1443        let new_meta = meta
1444            .builder_with_alter_kind("my_table", &alter_kind)
1445            .unwrap()
1446            .build()
1447            .unwrap();
1448
1449        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1450
1451        let names: Vec<String> = new_meta
1452            .schema
1453            .column_schemas()
1454            .iter()
1455            .map(|column_schema| column_schema.name.clone())
1456            .collect();
1457        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1458        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1459        assert_eq!(&[1], &new_meta.value_indices[..]);
1460        assert_eq!(
1461            schema.timestamp_column(),
1462            new_meta.schema.timestamp_column()
1463        );
1464    }
1465
1466    #[test]
1467    fn test_remove_multiple_columns_before_timestamp() {
1468        let column_schemas = vec![
1469            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1470            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1471            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1472            ColumnSchema::new(
1473                "ts",
1474                ConcreteDataType::timestamp_millisecond_datatype(),
1475                false,
1476            )
1477            .with_time_index(true),
1478        ];
1479        let schema = Arc::new(
1480            SchemaBuilder::try_from(column_schemas)
1481                .unwrap()
1482                .version(123)
1483                .build()
1484                .unwrap(),
1485        );
1486        let meta = TableMetaBuilder::empty()
1487            .schema(schema.clone())
1488            .primary_key_indices(vec![1])
1489            .engine("engine")
1490            .next_column_id(4)
1491            .build()
1492            .unwrap();
1493
1494        // Remove columns in reverse order to test whether timestamp index is valid.
1495        let alter_kind = AlterKind::DropColumns {
1496            names: vec![String::from("col3"), String::from("col1")],
1497        };
1498        let new_meta = meta
1499            .builder_with_alter_kind("my_table", &alter_kind)
1500            .unwrap()
1501            .build()
1502            .unwrap();
1503
1504        let names: Vec<String> = new_meta
1505            .schema
1506            .column_schemas()
1507            .iter()
1508            .map(|column_schema| column_schema.name.clone())
1509            .collect();
1510        assert_eq!(&["col2", "ts"], &names[..]);
1511        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1512        assert_eq!(&[1], &new_meta.value_indices[..]);
1513        assert_eq!(
1514            schema.timestamp_column(),
1515            new_meta.schema.timestamp_column()
1516        );
1517    }
1518
1519    #[test]
1520    fn test_add_existing_column() {
1521        let schema = Arc::new(new_test_schema());
1522        let meta = TableMetaBuilder::empty()
1523            .schema(schema)
1524            .primary_key_indices(vec![0])
1525            .engine("engine")
1526            .next_column_id(3)
1527            .build()
1528            .unwrap();
1529
1530        let alter_kind = AlterKind::AddColumns {
1531            columns: vec![AddColumnRequest {
1532                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1533                is_key: false,
1534                location: None,
1535                add_if_not_exists: false,
1536            }],
1537        };
1538
1539        let err = meta
1540            .builder_with_alter_kind("my_table", &alter_kind)
1541            .err()
1542            .unwrap();
1543        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1544
1545        // Add if not exists
1546        let alter_kind = AlterKind::AddColumns {
1547            columns: vec![AddColumnRequest {
1548                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1549                is_key: true,
1550                location: None,
1551                add_if_not_exists: true,
1552            }],
1553        };
1554        let new_meta = meta
1555            .builder_with_alter_kind("my_table", &alter_kind)
1556            .unwrap()
1557            .build()
1558            .unwrap();
1559        assert_eq!(
1560            meta.schema.column_schemas(),
1561            new_meta.schema.column_schemas()
1562        );
1563        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1564    }
1565
1566    #[test]
1567    fn test_add_different_type_column() {
1568        let schema = Arc::new(new_test_schema());
1569        let meta = TableMetaBuilder::empty()
1570            .schema(schema)
1571            .primary_key_indices(vec![0])
1572            .engine("engine")
1573            .next_column_id(3)
1574            .build()
1575            .unwrap();
1576
1577        // Add if not exists, but different type.
1578        let alter_kind = AlterKind::AddColumns {
1579            columns: vec![AddColumnRequest {
1580                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1581                is_key: false,
1582                location: None,
1583                add_if_not_exists: true,
1584            }],
1585        };
1586        let err = meta
1587            .builder_with_alter_kind("my_table", &alter_kind)
1588            .err()
1589            .unwrap();
1590        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1591    }
1592
1593    #[test]
1594    fn test_add_invalid_column() {
1595        let schema = Arc::new(new_test_schema());
1596        let meta = TableMetaBuilder::empty()
1597            .schema(schema)
1598            .primary_key_indices(vec![0])
1599            .engine("engine")
1600            .next_column_id(3)
1601            .build()
1602            .unwrap();
1603
1604        // Not nullable and no default value.
1605        let alter_kind = AlterKind::AddColumns {
1606            columns: vec![AddColumnRequest {
1607                column_schema: ColumnSchema::new(
1608                    "weny",
1609                    ConcreteDataType::string_datatype(),
1610                    false,
1611                ),
1612                is_key: false,
1613                location: None,
1614                add_if_not_exists: false,
1615            }],
1616        };
1617
1618        let err = meta
1619            .builder_with_alter_kind("my_table", &alter_kind)
1620            .err()
1621            .unwrap();
1622        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1623    }
1624
1625    #[test]
1626    fn test_remove_unknown_column() {
1627        let schema = Arc::new(new_test_schema());
1628        let meta = TableMetaBuilder::empty()
1629            .schema(schema)
1630            .primary_key_indices(vec![0])
1631            .engine("engine")
1632            .next_column_id(3)
1633            .build()
1634            .unwrap();
1635
1636        let alter_kind = AlterKind::DropColumns {
1637            names: vec![String::from("unknown")],
1638        };
1639
1640        let err = meta
1641            .builder_with_alter_kind("my_table", &alter_kind)
1642            .err()
1643            .unwrap();
1644        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1645    }
1646
1647    #[test]
1648    fn test_change_unknown_column_data_type() {
1649        let schema = Arc::new(new_test_schema());
1650        let meta = TableMetaBuilder::empty()
1651            .schema(schema)
1652            .primary_key_indices(vec![0])
1653            .engine("engine")
1654            .next_column_id(3)
1655            .build()
1656            .unwrap();
1657
1658        let alter_kind = AlterKind::ModifyColumnTypes {
1659            columns: vec![ModifyColumnTypeRequest {
1660                column_name: "unknown".to_string(),
1661                target_type: ConcreteDataType::string_datatype(),
1662            }],
1663        };
1664
1665        let err = meta
1666            .builder_with_alter_kind("my_table", &alter_kind)
1667            .err()
1668            .unwrap();
1669        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1670    }
1671
1672    #[test]
1673    fn test_remove_key_column() {
1674        let schema = Arc::new(new_test_schema());
1675        let meta = TableMetaBuilder::empty()
1676            .schema(schema)
1677            .primary_key_indices(vec![0])
1678            .engine("engine")
1679            .next_column_id(3)
1680            .build()
1681            .unwrap();
1682
1683        // Remove column in primary key.
1684        let alter_kind = AlterKind::DropColumns {
1685            names: vec![String::from("col1")],
1686        };
1687
1688        let err = meta
1689            .builder_with_alter_kind("my_table", &alter_kind)
1690            .err()
1691            .unwrap();
1692        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1693
1694        // Remove timestamp column.
1695        let alter_kind = AlterKind::DropColumns {
1696            names: vec![String::from("ts")],
1697        };
1698
1699        let err = meta
1700            .builder_with_alter_kind("my_table", &alter_kind)
1701            .err()
1702            .unwrap();
1703        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1704    }
1705
1706    #[test]
1707    fn test_change_key_column_data_type() {
1708        let schema = Arc::new(new_test_schema());
1709        let meta = TableMetaBuilder::empty()
1710            .schema(schema)
1711            .primary_key_indices(vec![0])
1712            .engine("engine")
1713            .next_column_id(3)
1714            .build()
1715            .unwrap();
1716
1717        // Remove column in primary key.
1718        let alter_kind = AlterKind::ModifyColumnTypes {
1719            columns: vec![ModifyColumnTypeRequest {
1720                column_name: "col1".to_string(),
1721                target_type: ConcreteDataType::string_datatype(),
1722            }],
1723        };
1724
1725        let err = meta
1726            .builder_with_alter_kind("my_table", &alter_kind)
1727            .err()
1728            .unwrap();
1729        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1730
1731        // Remove timestamp column.
1732        let alter_kind = AlterKind::ModifyColumnTypes {
1733            columns: vec![ModifyColumnTypeRequest {
1734                column_name: "ts".to_string(),
1735                target_type: ConcreteDataType::string_datatype(),
1736            }],
1737        };
1738
1739        let err = meta
1740            .builder_with_alter_kind("my_table", &alter_kind)
1741            .err()
1742            .unwrap();
1743        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1744    }
1745
1746    #[test]
1747    fn test_alloc_new_column() {
1748        let schema = Arc::new(new_test_schema());
1749        let mut meta = TableMetaBuilder::empty()
1750            .schema(schema)
1751            .primary_key_indices(vec![0])
1752            .engine("engine")
1753            .next_column_id(3)
1754            .build()
1755            .unwrap();
1756        assert_eq!(3, meta.next_column_id);
1757
1758        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1759        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1760
1761        assert_eq!(4, meta.next_column_id);
1762        assert_eq!(column_schema.name, desc.name);
1763    }
1764
1765    #[test]
1766    fn test_add_columns_with_location() {
1767        let schema = Arc::new(new_test_schema());
1768        let meta = TableMetaBuilder::empty()
1769            .schema(schema)
1770            .primary_key_indices(vec![0])
1771            .engine("engine")
1772            .next_column_id(3)
1773            .build()
1774            .unwrap();
1775
1776        let new_meta = add_columns_to_meta_with_location(&meta);
1777        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1778
1779        let names: Vec<String> = new_meta
1780            .schema
1781            .column_schemas()
1782            .iter()
1783            .map(|column_schema| column_schema.name.clone())
1784            .collect();
1785        assert_eq!(
1786            &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
1787            &names[..]
1788        );
1789        assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
1790        assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
1791    }
1792
1793    #[test]
1794    fn test_modify_column_fulltext_options() {
1795        let schema = Arc::new(new_test_schema());
1796        let meta = TableMetaBuilder::empty()
1797            .schema(schema)
1798            .primary_key_indices(vec![0])
1799            .engine("engine")
1800            .next_column_id(3)
1801            .build()
1802            .unwrap();
1803
1804        let alter_kind = AlterKind::SetIndex {
1805            options: SetIndexOptions::Fulltext {
1806                column_name: "col1".to_string(),
1807                options: FulltextOptions::default(),
1808            },
1809        };
1810        let err = meta
1811            .builder_with_alter_kind("my_table", &alter_kind)
1812            .err()
1813            .unwrap();
1814        assert_eq!(
1815            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1816            err.to_string()
1817        );
1818
1819        // Add a string column and make it fulltext indexed
1820        let new_meta = add_columns_to_meta_with_location(&meta);
1821        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1822
1823        let alter_kind = AlterKind::SetIndex {
1824            options: SetIndexOptions::Fulltext {
1825                column_name: "my_tag_first".to_string(),
1826                options: FulltextOptions {
1827                    enable: true,
1828                    analyzer: FulltextAnalyzer::Chinese,
1829                    case_sensitive: true,
1830                    backend: FulltextBackend::Bloom,
1831                },
1832            },
1833        };
1834        let new_meta = new_meta
1835            .builder_with_alter_kind("my_table", &alter_kind)
1836            .unwrap()
1837            .build()
1838            .unwrap();
1839        let column_schema = new_meta
1840            .schema
1841            .column_schema_by_name("my_tag_first")
1842            .unwrap();
1843        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1844        assert!(fulltext_options.enable);
1845        assert_eq!(
1846            datatypes::schema::FulltextAnalyzer::Chinese,
1847            fulltext_options.analyzer
1848        );
1849        assert!(fulltext_options.case_sensitive);
1850
1851        let alter_kind = AlterKind::UnsetIndex {
1852            options: UnsetIndexOptions::Fulltext {
1853                column_name: "my_tag_first".to_string(),
1854            },
1855        };
1856        let new_meta = new_meta
1857            .builder_with_alter_kind("my_table", &alter_kind)
1858            .unwrap()
1859            .build()
1860            .unwrap();
1861        let column_schema = new_meta
1862            .schema
1863            .column_schema_by_name("my_tag_first")
1864            .unwrap();
1865        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1866        assert!(!fulltext_options.enable);
1867    }
1868}