table/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
20use common_macro::ToMetaBuilder;
21use common_query::AddColumnLocation;
22use datafusion_expr::TableProviderFilterPushDown;
23pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
24use datatypes::schema::{
25    ColumnSchema, FulltextOptions, RawSchema, Schema, SchemaBuilder, SchemaRef,
26    SkippingIndexOptions,
27};
28use derive_builder::Builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
32use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
33use store_api::region_request::{SetRegionOption, UnsetRegionOption};
34use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
35
36use crate::error::{self, Result};
37use crate::requests::{
38    AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
39    UnsetIndexOptions,
40};
41
42pub type TableId = u32;
43pub type TableVersion = u64;
44
45/// Indicates whether and how a filter expression can be handled by a
46/// Table for table scans.
47#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
48pub enum FilterPushDownType {
49    /// The expression cannot be used by the provider.
50    Unsupported,
51    /// The expression can be used to help minimise the data retrieved,
52    /// but the provider cannot guarantee that all returned tuples
53    /// satisfy the filter. The Filter plan node containing this expression
54    /// will be preserved.
55    Inexact,
56    /// The provider guarantees that all returned data satisfies this
57    /// filter expression. The Filter plan node containing this expression
58    /// will be removed.
59    Exact,
60}
61
62impl From<TableProviderFilterPushDown> for FilterPushDownType {
63    fn from(value: TableProviderFilterPushDown) -> Self {
64        match value {
65            TableProviderFilterPushDown::Unsupported => FilterPushDownType::Unsupported,
66            TableProviderFilterPushDown::Inexact => FilterPushDownType::Inexact,
67            TableProviderFilterPushDown::Exact => FilterPushDownType::Exact,
68        }
69    }
70}
71
72impl From<FilterPushDownType> for TableProviderFilterPushDown {
73    fn from(value: FilterPushDownType) -> Self {
74        match value {
75            FilterPushDownType::Unsupported => TableProviderFilterPushDown::Unsupported,
76            FilterPushDownType::Inexact => TableProviderFilterPushDown::Inexact,
77            FilterPushDownType::Exact => TableProviderFilterPushDown::Exact,
78        }
79    }
80}
81
82/// Indicates the type of this table for metadata/catalog purposes.
83#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
84pub enum TableType {
85    /// An ordinary physical table.
86    Base,
87    /// A non-materialised table that itself uses a query internally to provide data.
88    View,
89    /// A transient table.
90    Temporary,
91}
92
93impl std::fmt::Display for TableType {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            TableType::Base => f.write_str("BASE TABLE"),
97            TableType::Temporary => f.write_str("TEMPORARY"),
98            TableType::View => f.write_str("VIEW"),
99        }
100    }
101}
102
103/// Identifier of the table.
104#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
105pub struct TableIdent {
106    /// Unique id of this table.
107    pub table_id: TableId,
108    /// Version of the table, bumped when metadata (such as schema) of the table
109    /// being changed.
110    pub version: TableVersion,
111}
112
113/// The table metadata.
114///
115/// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works.
116#[derive(Clone, Debug, Builder, PartialEq, Eq, ToMetaBuilder)]
117#[builder(pattern = "mutable", custom_constructor)]
118pub struct TableMeta {
119    pub schema: SchemaRef,
120    /// The indices of columns in primary key. Note that the index of timestamp column
121    /// is not included in these indices.
122    pub primary_key_indices: Vec<usize>,
123    #[builder(default = "self.default_value_indices()?")]
124    pub value_indices: Vec<usize>,
125    #[builder(default, setter(into))]
126    pub engine: String,
127    #[builder(default, setter(into))]
128    pub region_numbers: Vec<u32>,
129    pub next_column_id: ColumnId,
130    /// Table options.
131    #[builder(default)]
132    pub options: TableOptions,
133    #[builder(default = "Utc::now()")]
134    pub created_on: DateTime<Utc>,
135    #[builder(default = "Vec::new()")]
136    pub partition_key_indices: Vec<usize>,
137}
138
139impl TableMetaBuilder {
140    /// Note: Please always use [new_meta_builder] to create new [TableMetaBuilder].
141    #[cfg(any(test, feature = "testing"))]
142    pub fn empty() -> Self {
143        Self {
144            schema: None,
145            primary_key_indices: None,
146            value_indices: None,
147            engine: None,
148            region_numbers: None,
149            next_column_id: None,
150            options: None,
151            created_on: None,
152            partition_key_indices: None,
153        }
154    }
155}
156
157impl TableMetaBuilder {
158    fn default_value_indices(&self) -> std::result::Result<Vec<usize>, String> {
159        match (&self.primary_key_indices, &self.schema) {
160            (Some(v), Some(schema)) => {
161                let column_schemas = schema.column_schemas();
162                Ok((0..column_schemas.len())
163                    .filter(|idx| !v.contains(idx))
164                    .collect())
165            }
166            _ => Err("Missing primary_key_indices or schema to create value_indices".to_string()),
167        }
168    }
169
170    pub fn new_external_table() -> Self {
171        Self {
172            schema: None,
173            primary_key_indices: Some(Vec::new()),
174            value_indices: Some(Vec::new()),
175            engine: None,
176            region_numbers: Some(Vec::new()),
177            next_column_id: Some(0),
178            options: None,
179            created_on: None,
180            partition_key_indices: None,
181        }
182    }
183}
184
185/// The result after splitting requests by column location info.
186struct SplitResult<'a> {
187    /// column requests should be added at first place.
188    columns_at_first: Vec<&'a AddColumnRequest>,
189    /// column requests should be added after already exist columns.
190    columns_at_after: HashMap<String, Vec<&'a AddColumnRequest>>,
191    /// column requests should be added at last place.
192    columns_at_last: Vec<&'a AddColumnRequest>,
193    /// all column names should be added.
194    column_names: Vec<String>,
195}
196
197impl TableMeta {
198    pub fn row_key_column_names(&self) -> impl Iterator<Item = &String> {
199        let columns_schemas = &self.schema.column_schemas();
200        self.primary_key_indices
201            .iter()
202            .map(|idx| &columns_schemas[*idx].name)
203    }
204
205    pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
206        // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
207        let columns_schemas = self.schema.column_schemas();
208        let primary_key_indices = &self.primary_key_indices;
209        columns_schemas
210            .iter()
211            .enumerate()
212            .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
213            .map(|(_, cs)| &cs.name)
214    }
215
216    /// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
217    ///
218    /// The returned builder would derive the next column id of this meta.
219    pub fn builder_with_alter_kind(
220        &self,
221        table_name: &str,
222        alter_kind: &AlterKind,
223    ) -> Result<TableMetaBuilder> {
224        match alter_kind {
225            AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
226            AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
227            AlterKind::ModifyColumnTypes { columns } => {
228                self.modify_column_types(table_name, columns)
229            }
230            // No need to rebuild table meta when renaming tables.
231            AlterKind::RenameTable { .. } => Ok(self.new_meta_builder()),
232            AlterKind::SetTableOptions { options } => self.set_table_options(options),
233            AlterKind::UnsetTableOptions { keys } => self.unset_table_options(keys),
234            AlterKind::SetIndex { options } => match options {
235                SetIndexOptions::Fulltext {
236                    column_name,
237                    options,
238                } => self.change_column_fulltext_options(
239                    table_name,
240                    column_name,
241                    true,
242                    Some(options),
243                ),
244                SetIndexOptions::Inverted { column_name } => {
245                    self.change_column_modify_inverted_index(table_name, column_name, true)
246                }
247                SetIndexOptions::Skipping {
248                    column_name,
249                    options,
250                } => self.change_column_skipping_index_options(
251                    table_name,
252                    column_name,
253                    Some(options),
254                ),
255            },
256            AlterKind::UnsetIndex { options } => match options {
257                UnsetIndexOptions::Fulltext { column_name } => {
258                    self.change_column_fulltext_options(table_name, column_name, false, None)
259                }
260                UnsetIndexOptions::Inverted { column_name } => {
261                    self.change_column_modify_inverted_index(table_name, column_name, false)
262                }
263                UnsetIndexOptions::Skipping { column_name } => {
264                    self.change_column_skipping_index_options(table_name, column_name, None)
265                }
266            },
267            AlterKind::DropDefaults { names } => self.drop_defaults(table_name, names),
268        }
269    }
270
271    /// Creates a [TableMetaBuilder] with modified table options.
272    fn set_table_options(&self, requests: &[SetRegionOption]) -> Result<TableMetaBuilder> {
273        let mut new_options = self.options.clone();
274
275        for request in requests {
276            match request {
277                SetRegionOption::Ttl(new_ttl) => {
278                    new_options.ttl = *new_ttl;
279                }
280                SetRegionOption::Twsc(key, value) => {
281                    if !value.is_empty() {
282                        new_options
283                            .extra_options
284                            .insert(key.to_string(), value.to_string());
285                        // Ensure node restart correctly.
286                        new_options.extra_options.insert(
287                            COMPACTION_TYPE.to_string(),
288                            COMPACTION_TYPE_TWCS.to_string(),
289                        );
290                    } else {
291                        // Invalidate the previous change option if an empty value has been set.
292                        new_options.extra_options.remove(key.as_str());
293                    }
294                }
295            }
296        }
297        let mut builder = self.new_meta_builder();
298        builder.options(new_options);
299
300        Ok(builder)
301    }
302
303    fn unset_table_options(&self, requests: &[UnsetRegionOption]) -> Result<TableMetaBuilder> {
304        let requests = requests.iter().map(Into::into).collect::<Vec<_>>();
305        self.set_table_options(&requests)
306    }
307
308    /// Creates a [TableMetaBuilder] with modified column inverted index.
309    fn change_column_modify_inverted_index(
310        &self,
311        table_name: &str,
312        column_name: &str,
313        value: bool,
314    ) -> Result<TableMetaBuilder> {
315        let table_schema = &self.schema;
316        let mut meta_builder = self.new_meta_builder();
317
318        let mut columns: Vec<ColumnSchema> =
319            Vec::with_capacity(table_schema.column_schemas().len());
320
321        for column_schema in table_schema.column_schemas().iter() {
322            if column_schema.name == column_name {
323                let mut new_column_schema = column_schema.clone();
324                new_column_schema.set_inverted_index(value);
325                columns.push(new_column_schema);
326            } else {
327                columns.push(column_schema.clone());
328            }
329        }
330
331        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
332        let mut builder = SchemaBuilder::try_from_columns(columns)
333            .with_context(|_| error::SchemaBuildSnafu {
334                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
335            })?
336            .version(table_schema.version() + 1);
337
338        for (k, v) in table_schema.metadata().iter() {
339            builder = builder.add_metadata(k, v);
340        }
341
342        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
343            msg: format!(
344                "Table {table_name} cannot change fulltext options for column {column_name}",
345            ),
346        })?;
347
348        let _ = meta_builder
349            .schema(Arc::new(new_schema))
350            .primary_key_indices(self.primary_key_indices.clone());
351
352        Ok(meta_builder)
353    }
354
355    /// Creates a [TableMetaBuilder] with modified column fulltext options.
356    fn change_column_fulltext_options(
357        &self,
358        table_name: &str,
359        column_name: &str,
360        enable: bool,
361        options: Option<&FulltextOptions>,
362    ) -> Result<TableMetaBuilder> {
363        let table_schema = &self.schema;
364        let mut meta_builder = self.new_meta_builder();
365
366        let column = &table_schema
367            .column_schema_by_name(column_name)
368            .with_context(|| error::ColumnNotExistsSnafu {
369                column_name,
370                table_name,
371            })?;
372
373        ensure!(
374            column.data_type.is_string(),
375            error::InvalidColumnOptionSnafu {
376                column_name,
377                msg: "FULLTEXT index only supports string type",
378            }
379        );
380
381        let current_fulltext_options = column
382            .fulltext_options()
383            .context(error::SetFulltextOptionsSnafu { column_name })?;
384
385        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
386        for column_schema in table_schema.column_schemas() {
387            if column_schema.name == column_name {
388                let mut new_column_schema = column_schema.clone();
389                if enable {
390                    ensure!(
391                        options.is_some(),
392                        error::InvalidColumnOptionSnafu {
393                            column_name,
394                            msg: "FULLTEXT index options must be provided",
395                        }
396                    );
397                    set_column_fulltext_options(
398                        &mut new_column_schema,
399                        column_name,
400                        options.unwrap(),
401                        current_fulltext_options.clone(),
402                    )?
403                } else {
404                    unset_column_fulltext_options(
405                        &mut new_column_schema,
406                        column_name,
407                        current_fulltext_options.clone(),
408                    )?
409                }
410                columns.push(new_column_schema);
411            } else {
412                columns.push(column_schema.clone());
413            }
414        }
415
416        // TODO(CookiePieWw): This part for all alter table operations is similar. We can refactor it.
417        let mut builder = SchemaBuilder::try_from_columns(columns)
418            .with_context(|_| error::SchemaBuildSnafu {
419                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
420            })?
421            .version(table_schema.version() + 1);
422
423        for (k, v) in table_schema.metadata().iter() {
424            builder = builder.add_metadata(k, v);
425        }
426
427        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
428            msg: format!(
429                "Table {table_name} cannot change fulltext options for column {column_name}",
430            ),
431        })?;
432
433        let _ = meta_builder
434            .schema(Arc::new(new_schema))
435            .primary_key_indices(self.primary_key_indices.clone());
436
437        Ok(meta_builder)
438    }
439
440    /// Creates a [TableMetaBuilder] with modified column skipping index options.
441    fn change_column_skipping_index_options(
442        &self,
443        table_name: &str,
444        column_name: &str,
445        options: Option<&SkippingIndexOptions>,
446    ) -> Result<TableMetaBuilder> {
447        let table_schema = &self.schema;
448        let mut meta_builder = self.new_meta_builder();
449
450        let mut columns = Vec::with_capacity(table_schema.column_schemas().len());
451        for column_schema in table_schema.column_schemas() {
452            if column_schema.name == column_name {
453                let mut new_column_schema = column_schema.clone();
454                if let Some(options) = options {
455                    set_column_skipping_index_options(
456                        &mut new_column_schema,
457                        column_name,
458                        options,
459                    )?;
460                } else {
461                    unset_column_skipping_index_options(&mut new_column_schema, column_name)?;
462                }
463                columns.push(new_column_schema);
464            } else {
465                columns.push(column_schema.clone());
466            }
467        }
468
469        let mut builder = SchemaBuilder::try_from_columns(columns)
470            .with_context(|_| error::SchemaBuildSnafu {
471                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
472            })?
473            .version(table_schema.version() + 1);
474
475        for (k, v) in table_schema.metadata().iter() {
476            builder = builder.add_metadata(k, v);
477        }
478
479        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
480            msg: format!("Failed to convert column schemas into schema for table {table_name}"),
481        })?;
482
483        let _ = meta_builder
484            .schema(Arc::new(new_schema))
485            .primary_key_indices(self.primary_key_indices.clone());
486
487        Ok(meta_builder)
488    }
489
490    // TODO(yingwen): Remove this.
491    /// Allocate a new column for the table.
492    ///
493    /// This method would bump the `next_column_id` of the meta.
494    pub fn alloc_new_column(
495        &mut self,
496        table_name: &str,
497        new_column: &ColumnSchema,
498    ) -> Result<ColumnDescriptor> {
499        let desc = ColumnDescriptorBuilder::new(
500            self.next_column_id as ColumnId,
501            &new_column.name,
502            new_column.data_type.clone(),
503        )
504        .is_nullable(new_column.is_nullable())
505        .default_constraint(new_column.default_constraint().cloned())
506        .build()
507        .context(error::BuildColumnDescriptorSnafu {
508            table_name,
509            column_name: &new_column.name,
510        })?;
511
512        // Bump next column id.
513        self.next_column_id += 1;
514
515        Ok(desc)
516    }
517
518    /// Create a [`TableMetaBuilder`] from the current TableMeta.
519    fn new_meta_builder(&self) -> TableMetaBuilder {
520        let mut builder = TableMetaBuilder::from(self);
521        // Manually remove value_indices.
522        builder.value_indices = None;
523        builder
524    }
525
526    // TODO(yingwen): Tests add if not exists.
527    fn add_columns(
528        &self,
529        table_name: &str,
530        requests: &[AddColumnRequest],
531    ) -> Result<TableMetaBuilder> {
532        let table_schema = &self.schema;
533        let mut meta_builder = self.new_meta_builder();
534        let original_primary_key_indices: HashSet<&usize> =
535            self.primary_key_indices.iter().collect();
536
537        let mut names = HashSet::with_capacity(requests.len());
538        let mut new_columns = Vec::with_capacity(requests.len());
539        for col_to_add in requests {
540            if let Some(column_schema) =
541                table_schema.column_schema_by_name(&col_to_add.column_schema.name)
542            {
543                // If the column already exists.
544                ensure!(
545                    col_to_add.add_if_not_exists,
546                    error::ColumnExistsSnafu {
547                        table_name,
548                        column_name: &col_to_add.column_schema.name
549                    },
550                );
551
552                // Checks if the type is the same
553                ensure!(
554                    column_schema.data_type == col_to_add.column_schema.data_type,
555                    error::InvalidAlterRequestSnafu {
556                        table: table_name,
557                        err: format!(
558                            "column {} already exists with different type {:?}",
559                            col_to_add.column_schema.name, column_schema.data_type,
560                        ),
561                    }
562                );
563            } else {
564                // A new column.
565                // Ensures we only add a column once.
566                ensure!(
567                    names.insert(&col_to_add.column_schema.name),
568                    error::InvalidAlterRequestSnafu {
569                        table: table_name,
570                        err: format!(
571                            "add column {} more than once",
572                            col_to_add.column_schema.name
573                        ),
574                    }
575                );
576
577                ensure!(
578                    col_to_add.column_schema.is_nullable()
579                        || col_to_add.column_schema.default_constraint().is_some(),
580                    error::InvalidAlterRequestSnafu {
581                        table: table_name,
582                        err: format!(
583                            "no default value for column {}",
584                            col_to_add.column_schema.name
585                        ),
586                    },
587                );
588
589                new_columns.push(col_to_add.clone());
590            }
591        }
592        let requests = &new_columns[..];
593
594        let SplitResult {
595            columns_at_first,
596            columns_at_after,
597            columns_at_last,
598            column_names,
599        } = self.split_requests_by_column_location(table_name, requests)?;
600        let mut primary_key_indices = Vec::with_capacity(self.primary_key_indices.len());
601        let mut columns = Vec::with_capacity(table_schema.num_columns() + requests.len());
602        // add new columns with FIRST, and in reverse order of requests.
603        columns_at_first.iter().rev().for_each(|request| {
604            if request.is_key {
605                // If a key column is added, we also need to store its index in primary_key_indices.
606                primary_key_indices.push(columns.len());
607            }
608            columns.push(request.column_schema.clone());
609        });
610        // add existed columns in original order and handle new columns with AFTER.
611        for (index, column_schema) in table_schema.column_schemas().iter().enumerate() {
612            if original_primary_key_indices.contains(&index) {
613                primary_key_indices.push(columns.len());
614            }
615            columns.push(column_schema.clone());
616            if let Some(requests) = columns_at_after.get(&column_schema.name) {
617                requests.iter().rev().for_each(|request| {
618                    if request.is_key {
619                        // If a key column is added, we also need to store its index in primary_key_indices.
620                        primary_key_indices.push(columns.len());
621                    }
622                    columns.push(request.column_schema.clone());
623                });
624            }
625        }
626        // add new columns without location info to last.
627        columns_at_last.iter().for_each(|request| {
628            if request.is_key {
629                // If a key column is added, we also need to store its index in primary_key_indices.
630                primary_key_indices.push(columns.len());
631            }
632            columns.push(request.column_schema.clone());
633        });
634
635        let mut builder = SchemaBuilder::try_from(columns)
636            .with_context(|_| error::SchemaBuildSnafu {
637                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
638            })?
639            // Also bump the schema version.
640            .version(table_schema.version() + 1);
641        for (k, v) in table_schema.metadata().iter() {
642            builder = builder.add_metadata(k, v);
643        }
644        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
645            msg: format!("Table {table_name} cannot add new columns {column_names:?}"),
646        })?;
647
648        // value_indices would be generated automatically.
649        let _ = meta_builder
650            .schema(Arc::new(new_schema))
651            .primary_key_indices(primary_key_indices);
652
653        Ok(meta_builder)
654    }
655
656    fn remove_columns(
657        &self,
658        table_name: &str,
659        column_names: &[String],
660    ) -> Result<TableMetaBuilder> {
661        let table_schema = &self.schema;
662        let column_names: HashSet<_> = column_names.iter().collect();
663        let mut meta_builder = self.new_meta_builder();
664
665        let timestamp_index = table_schema.timestamp_index();
666        // Check whether columns are existing and not in primary key index.
667        for column_name in &column_names {
668            if let Some(index) = table_schema.column_index_by_name(column_name) {
669                // This is a linear search, but since there won't be too much columns, the performance should
670                // be acceptable.
671                ensure!(
672                    !self.primary_key_indices.contains(&index),
673                    error::RemoveColumnInIndexSnafu {
674                        column_name: *column_name,
675                        table_name,
676                    }
677                );
678
679                if let Some(ts_index) = timestamp_index {
680                    // Not allowed to remove column in timestamp index.
681                    ensure!(
682                        index != ts_index,
683                        error::RemoveColumnInIndexSnafu {
684                            column_name: table_schema.column_name_by_index(ts_index),
685                            table_name,
686                        }
687                    );
688                }
689            } else {
690                return error::ColumnNotExistsSnafu {
691                    column_name: *column_name,
692                    table_name,
693                }
694                .fail()?;
695            }
696        }
697
698        // Collect columns after removal.
699        let columns: Vec<_> = table_schema
700            .column_schemas()
701            .iter()
702            .filter(|column_schema| !column_names.contains(&column_schema.name))
703            .cloned()
704            .collect();
705
706        let mut builder = SchemaBuilder::try_from_columns(columns)
707            .with_context(|_| error::SchemaBuildSnafu {
708                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
709            })?
710            // Also bump the schema version.
711            .version(table_schema.version() + 1);
712        for (k, v) in table_schema.metadata().iter() {
713            builder = builder.add_metadata(k, v);
714        }
715        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
716            msg: format!("Table {table_name} cannot add remove columns {column_names:?}"),
717        })?;
718
719        // Rebuild the indices of primary key columns.
720        let primary_key_indices = self
721            .primary_key_indices
722            .iter()
723            .map(|idx| table_schema.column_name_by_index(*idx))
724            // This unwrap is safe since we don't allow removing a primary key column.
725            .map(|name| new_schema.column_index_by_name(name).unwrap())
726            .collect();
727
728        let _ = meta_builder
729            .schema(Arc::new(new_schema))
730            .primary_key_indices(primary_key_indices);
731
732        Ok(meta_builder)
733    }
734
735    fn modify_column_types(
736        &self,
737        table_name: &str,
738        requests: &[ModifyColumnTypeRequest],
739    ) -> Result<TableMetaBuilder> {
740        let table_schema = &self.schema;
741        let mut meta_builder = self.new_meta_builder();
742
743        let mut modify_column_types = HashMap::with_capacity(requests.len());
744        let timestamp_index = table_schema.timestamp_index();
745
746        for col_to_change in requests {
747            let change_column_name = &col_to_change.column_name;
748
749            let index = table_schema
750                .column_index_by_name(change_column_name)
751                .with_context(|| error::ColumnNotExistsSnafu {
752                    column_name: change_column_name,
753                    table_name,
754                })?;
755
756            let column = &table_schema.column_schemas()[index];
757
758            ensure!(
759                !self.primary_key_indices.contains(&index),
760                error::InvalidAlterRequestSnafu {
761                    table: table_name,
762                    err: format!(
763                        "Not allowed to change primary key index column '{}'",
764                        column.name
765                    )
766                }
767            );
768
769            if let Some(ts_index) = timestamp_index {
770                // Not allowed to change column datatype in timestamp index.
771                ensure!(
772                    index != ts_index,
773                    error::InvalidAlterRequestSnafu {
774                        table: table_name,
775                        err: format!(
776                            "Not allowed to change timestamp index column '{}' datatype",
777                            column.name
778                        )
779                    }
780                );
781            }
782
783            ensure!(
784                modify_column_types
785                    .insert(&col_to_change.column_name, col_to_change)
786                    .is_none(),
787                error::InvalidAlterRequestSnafu {
788                    table: table_name,
789                    err: format!(
790                        "change column datatype {} more than once",
791                        col_to_change.column_name
792                    ),
793                }
794            );
795
796            ensure!(
797                column
798                    .data_type
799                    .can_arrow_type_cast_to(&col_to_change.target_type),
800                error::InvalidAlterRequestSnafu {
801                    table: table_name,
802                    err: format!(
803                        "column '{}' cannot be cast automatically to type '{}'",
804                        col_to_change.column_name, col_to_change.target_type,
805                    ),
806                }
807            );
808
809            ensure!(
810                column.is_nullable(),
811                error::InvalidAlterRequestSnafu {
812                    table: table_name,
813                    err: format!(
814                        "column '{}' must be nullable to ensure safe conversion.",
815                        col_to_change.column_name,
816                    ),
817                }
818            );
819        }
820        // Collect columns after changed.
821
822        let mut columns: Vec<_> = Vec::with_capacity(table_schema.column_schemas().len());
823        for mut column in table_schema.column_schemas().iter().cloned() {
824            if let Some(change_column) = modify_column_types.get(&column.name) {
825                column.data_type = change_column.target_type.clone();
826                let new_default = if let Some(default_value) = column.default_constraint() {
827                    Some(
828                        default_value
829                            .cast_to_datatype(&change_column.target_type)
830                            .with_context(|_| error::CastDefaultValueSnafu {
831                                reason: format!(
832                                    "Failed to cast default value from {:?} to type {:?}",
833                                    default_value, &change_column.target_type
834                                ),
835                            })?,
836                    )
837                } else {
838                    None
839                };
840                column = column
841                    .clone()
842                    .with_default_constraint(new_default.clone())
843                    .with_context(|_| error::CastDefaultValueSnafu {
844                        reason: format!("Failed to set new default: {:?}", new_default),
845                    })?;
846            }
847            columns.push(column)
848        }
849
850        let mut builder = SchemaBuilder::try_from_columns(columns)
851            .with_context(|_| error::SchemaBuildSnafu {
852                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
853            })?
854            // Also bump the schema version.
855            .version(table_schema.version() + 1);
856        for (k, v) in table_schema.metadata().iter() {
857            builder = builder.add_metadata(k, v);
858        }
859        let new_schema = builder.build().with_context(|_| {
860            let column_names: Vec<_> = requests
861                .iter()
862                .map(|request| &request.column_name)
863                .collect();
864
865            error::SchemaBuildSnafu {
866                msg: format!(
867                    "Table {table_name} cannot change datatype with columns {column_names:?}"
868                ),
869            }
870        })?;
871
872        let _ = meta_builder
873            .schema(Arc::new(new_schema))
874            .primary_key_indices(self.primary_key_indices.clone());
875
876        Ok(meta_builder)
877    }
878
879    /// Split requests into different groups using column location info.
880    fn split_requests_by_column_location<'a>(
881        &self,
882        table_name: &str,
883        requests: &'a [AddColumnRequest],
884    ) -> Result<SplitResult<'a>> {
885        let table_schema = &self.schema;
886        let mut columns_at_first = Vec::new();
887        let mut columns_at_after = HashMap::new();
888        let mut columns_at_last = Vec::new();
889        let mut column_names = Vec::with_capacity(requests.len());
890        for request in requests {
891            // Check whether columns to add are already existing.
892            let column_name = &request.column_schema.name;
893            column_names.push(column_name.clone());
894            ensure!(
895                table_schema.column_schema_by_name(column_name).is_none(),
896                error::ColumnExistsSnafu {
897                    column_name,
898                    table_name,
899                }
900            );
901            match request.location.as_ref() {
902                Some(AddColumnLocation::First) => {
903                    columns_at_first.push(request);
904                }
905                Some(AddColumnLocation::After { column_name }) => {
906                    ensure!(
907                        table_schema.column_schema_by_name(column_name).is_some(),
908                        error::ColumnNotExistsSnafu {
909                            column_name,
910                            table_name,
911                        }
912                    );
913                    columns_at_after
914                        .entry(column_name.clone())
915                        .or_insert(Vec::new())
916                        .push(request);
917                }
918                None => {
919                    columns_at_last.push(request);
920                }
921            }
922        }
923        Ok(SplitResult {
924            columns_at_first,
925            columns_at_after,
926            columns_at_last,
927            column_names,
928        })
929    }
930
931    fn drop_defaults(&self, table_name: &str, column_names: &[String]) -> Result<TableMetaBuilder> {
932        let table_schema = &self.schema;
933        let mut meta_builder = self.new_meta_builder();
934        let mut columns = Vec::with_capacity(table_schema.num_columns());
935        for column_schema in table_schema.column_schemas() {
936            if let Some(name) = column_names.iter().find(|s| **s == column_schema.name) {
937                // Drop default constraint.
938                ensure!(
939                    column_schema.default_constraint().is_some(),
940                    error::InvalidAlterRequestSnafu {
941                        table: table_name,
942                        err: format!("column {name} does not have a default value"),
943                    }
944                );
945                if !column_schema.is_nullable() {
946                    return error::InvalidAlterRequestSnafu {
947                        table: table_name,
948                        err: format!(
949                            "column {name} is not nullable and `default` cannot be dropped",
950                        ),
951                    }
952                    .fail();
953                }
954                let new_column_schema = column_schema.clone();
955                let new_column_schema = new_column_schema
956                    .with_default_constraint(None)
957                    .with_context(|_| error::SchemaBuildSnafu {
958                        msg: format!("Table {table_name} cannot drop default values"),
959                    })?;
960                columns.push(new_column_schema);
961            } else {
962                columns.push(column_schema.clone());
963            }
964        }
965
966        let mut builder = SchemaBuilder::try_from_columns(columns)
967            .with_context(|_| error::SchemaBuildSnafu {
968                msg: format!("Failed to convert column schemas into schema for table {table_name}"),
969            })?
970            // Also bump the schema version.
971            .version(table_schema.version() + 1);
972        for (k, v) in table_schema.metadata().iter() {
973            builder = builder.add_metadata(k, v);
974        }
975        let new_schema = builder.build().with_context(|_| error::SchemaBuildSnafu {
976            msg: format!("Table {table_name} cannot drop default values"),
977        })?;
978
979        let _ = meta_builder.schema(Arc::new(new_schema));
980
981        Ok(meta_builder)
982    }
983}
984#[derive(Clone, Debug, PartialEq, Eq, Builder)]
985#[builder(pattern = "owned")]
986pub struct TableInfo {
987    /// Id and version of the table.
988    #[builder(default, setter(into))]
989    pub ident: TableIdent,
990    /// Name of the table.
991    #[builder(setter(into))]
992    pub name: String,
993    /// Comment of the table.
994    #[builder(default, setter(into))]
995    pub desc: Option<String>,
996    #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
997    pub catalog_name: String,
998    #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
999    pub schema_name: String,
1000    pub meta: TableMeta,
1001    #[builder(default = "TableType::Base")]
1002    pub table_type: TableType,
1003}
1004
1005pub type TableInfoRef = Arc<TableInfo>;
1006
1007impl TableInfo {
1008    pub fn table_id(&self) -> TableId {
1009        self.ident.table_id
1010    }
1011
1012    pub fn region_ids(&self) -> Vec<RegionId> {
1013        self.meta
1014            .region_numbers
1015            .iter()
1016            .map(|id| RegionId::new(self.table_id(), *id))
1017            .collect()
1018    }
1019    /// Returns the full table name in the form of `{catalog}.{schema}.{table}`.
1020    pub fn full_table_name(&self) -> String {
1021        common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
1022    }
1023
1024    /// Returns true when the table is the metric engine's physical table.
1025    pub fn is_physical_table(&self) -> bool {
1026        self.meta
1027            .options
1028            .extra_options
1029            .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1030    }
1031
1032    /// Return true if the table's TTL is `instant`.
1033    pub fn is_ttl_instant_table(&self) -> bool {
1034        self.meta
1035            .options
1036            .ttl
1037            .map(|t| t.is_instant())
1038            .unwrap_or(false)
1039    }
1040}
1041
1042impl TableInfoBuilder {
1043    pub fn new<S: Into<String>>(name: S, meta: TableMeta) -> Self {
1044        Self {
1045            name: Some(name.into()),
1046            meta: Some(meta),
1047            ..Default::default()
1048        }
1049    }
1050
1051    pub fn table_id(mut self, id: TableId) -> Self {
1052        let ident = self.ident.get_or_insert_with(TableIdent::default);
1053        ident.table_id = id;
1054        self
1055    }
1056
1057    pub fn table_version(mut self, version: TableVersion) -> Self {
1058        let ident = self.ident.get_or_insert_with(TableIdent::default);
1059        ident.version = version;
1060        self
1061    }
1062}
1063
1064impl TableIdent {
1065    pub fn new(table_id: TableId) -> Self {
1066        Self {
1067            table_id,
1068            version: 0,
1069        }
1070    }
1071}
1072
1073impl From<TableId> for TableIdent {
1074    fn from(table_id: TableId) -> Self {
1075        Self::new(table_id)
1076    }
1077}
1078
1079/// Struct used to serialize and deserialize [`TableMeta`].
1080#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
1081pub struct RawTableMeta {
1082    pub schema: RawSchema,
1083    /// The indices of columns in primary key. Note that the index of timestamp column
1084    /// is not included. Order matters to this array.
1085    pub primary_key_indices: Vec<usize>,
1086    ///  The indices of columns in value. Order doesn't matter to this array.
1087    pub value_indices: Vec<usize>,
1088    /// Engine type of this table. Usually in small case.
1089    pub engine: String,
1090    /// Next column id of a new column.
1091    /// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982
1092    pub next_column_id: ColumnId,
1093    pub region_numbers: Vec<u32>,
1094    pub options: TableOptions,
1095    pub created_on: DateTime<Utc>,
1096    /// Order doesn't matter to this array.
1097    #[serde(default)]
1098    pub partition_key_indices: Vec<usize>,
1099}
1100
1101impl From<TableMeta> for RawTableMeta {
1102    fn from(meta: TableMeta) -> RawTableMeta {
1103        RawTableMeta {
1104            schema: RawSchema::from(&*meta.schema),
1105            primary_key_indices: meta.primary_key_indices,
1106            value_indices: meta.value_indices,
1107            engine: meta.engine,
1108            next_column_id: meta.next_column_id,
1109            region_numbers: meta.region_numbers,
1110            options: meta.options,
1111            created_on: meta.created_on,
1112            partition_key_indices: meta.partition_key_indices,
1113        }
1114    }
1115}
1116
1117impl TryFrom<RawTableMeta> for TableMeta {
1118    type Error = ConvertError;
1119
1120    fn try_from(raw: RawTableMeta) -> ConvertResult<TableMeta> {
1121        Ok(TableMeta {
1122            schema: Arc::new(Schema::try_from(raw.schema)?),
1123            primary_key_indices: raw.primary_key_indices,
1124            value_indices: raw.value_indices,
1125            engine: raw.engine,
1126            region_numbers: raw.region_numbers,
1127            next_column_id: raw.next_column_id,
1128            options: raw.options,
1129            created_on: raw.created_on,
1130            partition_key_indices: raw.partition_key_indices,
1131        })
1132    }
1133}
1134
1135/// Struct used to serialize and deserialize [`TableInfo`].
1136#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
1137pub struct RawTableInfo {
1138    pub ident: TableIdent,
1139    pub name: String,
1140    pub desc: Option<String>,
1141    pub catalog_name: String,
1142    pub schema_name: String,
1143    pub meta: RawTableMeta,
1144    pub table_type: TableType,
1145}
1146
1147impl RawTableInfo {
1148    /// Sort the columns in [RawTableInfo], logical tables require it.
1149    pub fn sort_columns(&mut self) {
1150        let column_schemas = &self.meta.schema.column_schemas;
1151        let primary_keys = self
1152            .meta
1153            .primary_key_indices
1154            .iter()
1155            .map(|index| column_schemas[*index].name.clone())
1156            .collect::<HashSet<_>>();
1157
1158        self.meta
1159            .schema
1160            .column_schemas
1161            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
1162
1163        // Compute new indices of sorted columns
1164        let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
1165        let mut timestamp_index = None;
1166        let mut value_indices =
1167            Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
1168        for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
1169            if primary_keys.contains(&column_schema.name) {
1170                primary_key_indices.push(index);
1171            } else if column_schema.is_time_index() {
1172                timestamp_index = Some(index);
1173            } else {
1174                value_indices.push(index);
1175            }
1176        }
1177
1178        // Overwrite table meta
1179        self.meta.schema.timestamp_index = timestamp_index;
1180        self.meta.primary_key_indices = primary_key_indices;
1181        self.meta.value_indices = value_indices;
1182    }
1183
1184    /// Extracts region options from table info.
1185    ///
1186    /// All "region options" are actually a copy of table options for redundancy.
1187    pub fn to_region_options(&self) -> HashMap<String, String> {
1188        HashMap::from(&self.meta.options)
1189    }
1190}
1191
1192impl From<TableInfo> for RawTableInfo {
1193    fn from(info: TableInfo) -> RawTableInfo {
1194        RawTableInfo {
1195            ident: info.ident,
1196            name: info.name,
1197            desc: info.desc,
1198            catalog_name: info.catalog_name,
1199            schema_name: info.schema_name,
1200            meta: RawTableMeta::from(info.meta),
1201            table_type: info.table_type,
1202        }
1203    }
1204}
1205
1206impl TryFrom<RawTableInfo> for TableInfo {
1207    type Error = ConvertError;
1208
1209    fn try_from(raw: RawTableInfo) -> ConvertResult<TableInfo> {
1210        Ok(TableInfo {
1211            ident: raw.ident,
1212            name: raw.name,
1213            desc: raw.desc,
1214            catalog_name: raw.catalog_name,
1215            schema_name: raw.schema_name,
1216            meta: TableMeta::try_from(raw.meta)?,
1217            table_type: raw.table_type,
1218        })
1219    }
1220}
1221
1222/// Set column fulltext options if it passed the validation.
1223///
1224/// Options allowed to modify:
1225/// * backend
1226///
1227/// Options not allowed to modify:
1228/// * analyzer
1229/// * case_sensitive
1230fn set_column_fulltext_options(
1231    column_schema: &mut ColumnSchema,
1232    column_name: &str,
1233    options: &FulltextOptions,
1234    current_options: Option<FulltextOptions>,
1235) -> Result<()> {
1236    if let Some(current_options) = current_options {
1237        ensure!(
1238            current_options.analyzer == options.analyzer
1239                && current_options.case_sensitive == options.case_sensitive,
1240            error::InvalidColumnOptionSnafu {
1241                column_name,
1242                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1243                current_options.analyzer, current_options.case_sensitive),
1244            }
1245        );
1246    }
1247
1248    column_schema
1249        .set_fulltext_options(options)
1250        .context(error::SetFulltextOptionsSnafu { column_name })?;
1251
1252    Ok(())
1253}
1254
1255fn unset_column_fulltext_options(
1256    column_schema: &mut ColumnSchema,
1257    column_name: &str,
1258    current_options: Option<FulltextOptions>,
1259) -> Result<()> {
1260    ensure!(
1261        current_options
1262            .as_ref()
1263            .is_some_and(|options| options.enable),
1264        error::InvalidColumnOptionSnafu {
1265            column_name,
1266            msg: "FULLTEXT index already disabled".to_string(),
1267        }
1268    );
1269
1270    let mut options = current_options.unwrap();
1271    options.enable = false;
1272    column_schema
1273        .set_fulltext_options(&options)
1274        .context(error::SetFulltextOptionsSnafu { column_name })?;
1275
1276    Ok(())
1277}
1278
1279fn set_column_skipping_index_options(
1280    column_schema: &mut ColumnSchema,
1281    column_name: &str,
1282    options: &SkippingIndexOptions,
1283) -> Result<()> {
1284    column_schema
1285        .set_skipping_options(options)
1286        .context(error::SetSkippingOptionsSnafu { column_name })?;
1287
1288    Ok(())
1289}
1290
1291fn unset_column_skipping_index_options(
1292    column_schema: &mut ColumnSchema,
1293    column_name: &str,
1294) -> Result<()> {
1295    column_schema
1296        .unset_skipping_options()
1297        .context(error::UnsetSkippingOptionsSnafu { column_name })?;
1298    Ok(())
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use common_error::ext::ErrorExt;
1304    use common_error::status_code::StatusCode;
1305    use datatypes::data_type::ConcreteDataType;
1306    use datatypes::schema::{
1307        ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SchemaBuilder,
1308    };
1309
1310    use super::*;
1311
1312    /// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
1313    fn new_test_schema() -> Schema {
1314        let column_schemas = vec![
1315            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1316            ColumnSchema::new(
1317                "ts",
1318                ConcreteDataType::timestamp_millisecond_datatype(),
1319                false,
1320            )
1321            .with_time_index(true),
1322            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1323        ];
1324        SchemaBuilder::try_from(column_schemas)
1325            .unwrap()
1326            .version(123)
1327            .build()
1328            .unwrap()
1329    }
1330
1331    #[test]
1332    fn test_raw_convert() {
1333        let schema = Arc::new(new_test_schema());
1334        let meta = TableMetaBuilder::empty()
1335            .schema(schema)
1336            .primary_key_indices(vec![0])
1337            .engine("engine")
1338            .next_column_id(3)
1339            .build()
1340            .unwrap();
1341        let info = TableInfoBuilder::default()
1342            .table_id(10)
1343            .table_version(5)
1344            .name("mytable")
1345            .meta(meta)
1346            .build()
1347            .unwrap();
1348
1349        let raw = RawTableInfo::from(info.clone());
1350        let info_new = TableInfo::try_from(raw).unwrap();
1351
1352        assert_eq!(info, info_new);
1353    }
1354
1355    fn add_columns_to_meta(meta: &TableMeta) -> TableMeta {
1356        let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
1357        let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
1358        let alter_kind = AlterKind::AddColumns {
1359            columns: vec![
1360                AddColumnRequest {
1361                    column_schema: new_tag,
1362                    is_key: true,
1363                    location: None,
1364                    add_if_not_exists: false,
1365                },
1366                AddColumnRequest {
1367                    column_schema: new_field,
1368                    is_key: false,
1369                    location: None,
1370                    add_if_not_exists: false,
1371                },
1372            ],
1373        };
1374
1375        let builder = meta
1376            .builder_with_alter_kind("my_table", &alter_kind)
1377            .unwrap();
1378        builder.build().unwrap()
1379    }
1380
1381    fn add_columns_to_meta_with_location(meta: &TableMeta) -> TableMeta {
1382        let new_tag = ColumnSchema::new("my_tag_first", ConcreteDataType::string_datatype(), true);
1383        let new_field = ColumnSchema::new(
1384            "my_field_after_ts",
1385            ConcreteDataType::string_datatype(),
1386            true,
1387        );
1388        let alter_kind = AlterKind::AddColumns {
1389            columns: vec![
1390                AddColumnRequest {
1391                    column_schema: new_tag,
1392                    is_key: true,
1393                    location: Some(AddColumnLocation::First),
1394                    add_if_not_exists: false,
1395                },
1396                AddColumnRequest {
1397                    column_schema: new_field,
1398                    is_key: false,
1399                    location: Some(AddColumnLocation::After {
1400                        column_name: "ts".to_string(),
1401                    }),
1402                    add_if_not_exists: false,
1403                },
1404            ],
1405        };
1406
1407        let builder = meta
1408            .builder_with_alter_kind("my_table", &alter_kind)
1409            .unwrap();
1410        builder.build().unwrap()
1411    }
1412
1413    #[test]
1414    fn test_add_columns() {
1415        let schema = Arc::new(new_test_schema());
1416        let meta = TableMetaBuilder::empty()
1417            .schema(schema)
1418            .primary_key_indices(vec![0])
1419            .engine("engine")
1420            .next_column_id(3)
1421            .build()
1422            .unwrap();
1423
1424        let new_meta = add_columns_to_meta(&meta);
1425        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1426
1427        let names: Vec<String> = new_meta
1428            .schema
1429            .column_schemas()
1430            .iter()
1431            .map(|column_schema| column_schema.name.clone())
1432            .collect();
1433        assert_eq!(&["col1", "ts", "col2", "my_tag", "my_field"], &names[..]);
1434        assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
1435        assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
1436    }
1437
1438    #[test]
1439    fn test_add_columns_multiple_times() {
1440        let schema = Arc::new(new_test_schema());
1441        let meta = TableMetaBuilder::empty()
1442            .schema(schema)
1443            .primary_key_indices(vec![0])
1444            .engine("engine")
1445            .next_column_id(3)
1446            .build()
1447            .unwrap();
1448
1449        let alter_kind = AlterKind::AddColumns {
1450            columns: vec![
1451                AddColumnRequest {
1452                    column_schema: ColumnSchema::new(
1453                        "col3",
1454                        ConcreteDataType::int32_datatype(),
1455                        true,
1456                    ),
1457                    is_key: true,
1458                    location: None,
1459                    add_if_not_exists: true,
1460                },
1461                AddColumnRequest {
1462                    column_schema: ColumnSchema::new(
1463                        "col3",
1464                        ConcreteDataType::int32_datatype(),
1465                        true,
1466                    ),
1467                    is_key: true,
1468                    location: None,
1469                    add_if_not_exists: true,
1470                },
1471            ],
1472        };
1473        let err = meta
1474            .builder_with_alter_kind("my_table", &alter_kind)
1475            .err()
1476            .unwrap();
1477        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1478    }
1479
1480    #[test]
1481    fn test_remove_columns() {
1482        let schema = Arc::new(new_test_schema());
1483        let meta = TableMetaBuilder::empty()
1484            .schema(schema.clone())
1485            .primary_key_indices(vec![0])
1486            .engine("engine")
1487            .next_column_id(3)
1488            .build()
1489            .unwrap();
1490        // Add more columns so we have enough candidate columns to remove.
1491        let meta = add_columns_to_meta(&meta);
1492
1493        let alter_kind = AlterKind::DropColumns {
1494            names: vec![String::from("col2"), String::from("my_field")],
1495        };
1496        let new_meta = meta
1497            .builder_with_alter_kind("my_table", &alter_kind)
1498            .unwrap()
1499            .build()
1500            .unwrap();
1501
1502        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1503
1504        let names: Vec<String> = new_meta
1505            .schema
1506            .column_schemas()
1507            .iter()
1508            .map(|column_schema| column_schema.name.clone())
1509            .collect();
1510        assert_eq!(&["col1", "ts", "my_tag"], &names[..]);
1511        assert_eq!(&[0, 2], &new_meta.primary_key_indices[..]);
1512        assert_eq!(&[1], &new_meta.value_indices[..]);
1513        assert_eq!(
1514            schema.timestamp_column(),
1515            new_meta.schema.timestamp_column()
1516        );
1517    }
1518
1519    #[test]
1520    fn test_remove_multiple_columns_before_timestamp() {
1521        let column_schemas = vec![
1522            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1523            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1524            ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1525            ColumnSchema::new(
1526                "ts",
1527                ConcreteDataType::timestamp_millisecond_datatype(),
1528                false,
1529            )
1530            .with_time_index(true),
1531        ];
1532        let schema = Arc::new(
1533            SchemaBuilder::try_from(column_schemas)
1534                .unwrap()
1535                .version(123)
1536                .build()
1537                .unwrap(),
1538        );
1539        let meta = TableMetaBuilder::empty()
1540            .schema(schema.clone())
1541            .primary_key_indices(vec![1])
1542            .engine("engine")
1543            .next_column_id(4)
1544            .build()
1545            .unwrap();
1546
1547        // Remove columns in reverse order to test whether timestamp index is valid.
1548        let alter_kind = AlterKind::DropColumns {
1549            names: vec![String::from("col3"), String::from("col1")],
1550        };
1551        let new_meta = meta
1552            .builder_with_alter_kind("my_table", &alter_kind)
1553            .unwrap()
1554            .build()
1555            .unwrap();
1556
1557        let names: Vec<String> = new_meta
1558            .schema
1559            .column_schemas()
1560            .iter()
1561            .map(|column_schema| column_schema.name.clone())
1562            .collect();
1563        assert_eq!(&["col2", "ts"], &names[..]);
1564        assert_eq!(&[0], &new_meta.primary_key_indices[..]);
1565        assert_eq!(&[1], &new_meta.value_indices[..]);
1566        assert_eq!(
1567            schema.timestamp_column(),
1568            new_meta.schema.timestamp_column()
1569        );
1570    }
1571
1572    #[test]
1573    fn test_add_existing_column() {
1574        let schema = Arc::new(new_test_schema());
1575        let meta = TableMetaBuilder::empty()
1576            .schema(schema)
1577            .primary_key_indices(vec![0])
1578            .engine("engine")
1579            .next_column_id(3)
1580            .build()
1581            .unwrap();
1582
1583        let alter_kind = AlterKind::AddColumns {
1584            columns: vec![AddColumnRequest {
1585                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1586                is_key: false,
1587                location: None,
1588                add_if_not_exists: false,
1589            }],
1590        };
1591
1592        let err = meta
1593            .builder_with_alter_kind("my_table", &alter_kind)
1594            .err()
1595            .unwrap();
1596        assert_eq!(StatusCode::TableColumnExists, err.status_code());
1597
1598        // Add if not exists
1599        let alter_kind = AlterKind::AddColumns {
1600            columns: vec![AddColumnRequest {
1601                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
1602                is_key: true,
1603                location: None,
1604                add_if_not_exists: true,
1605            }],
1606        };
1607        let new_meta = meta
1608            .builder_with_alter_kind("my_table", &alter_kind)
1609            .unwrap()
1610            .build()
1611            .unwrap();
1612        assert_eq!(
1613            meta.schema.column_schemas(),
1614            new_meta.schema.column_schemas()
1615        );
1616        assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
1617    }
1618
1619    #[test]
1620    fn test_add_different_type_column() {
1621        let schema = Arc::new(new_test_schema());
1622        let meta = TableMetaBuilder::empty()
1623            .schema(schema)
1624            .primary_key_indices(vec![0])
1625            .engine("engine")
1626            .next_column_id(3)
1627            .build()
1628            .unwrap();
1629
1630        // Add if not exists, but different type.
1631        let alter_kind = AlterKind::AddColumns {
1632            columns: vec![AddColumnRequest {
1633                column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
1634                is_key: false,
1635                location: None,
1636                add_if_not_exists: true,
1637            }],
1638        };
1639        let err = meta
1640            .builder_with_alter_kind("my_table", &alter_kind)
1641            .err()
1642            .unwrap();
1643        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1644    }
1645
1646    #[test]
1647    fn test_add_invalid_column() {
1648        let schema = Arc::new(new_test_schema());
1649        let meta = TableMetaBuilder::empty()
1650            .schema(schema)
1651            .primary_key_indices(vec![0])
1652            .engine("engine")
1653            .next_column_id(3)
1654            .build()
1655            .unwrap();
1656
1657        // Not nullable and no default value.
1658        let alter_kind = AlterKind::AddColumns {
1659            columns: vec![AddColumnRequest {
1660                column_schema: ColumnSchema::new(
1661                    "weny",
1662                    ConcreteDataType::string_datatype(),
1663                    false,
1664                ),
1665                is_key: false,
1666                location: None,
1667                add_if_not_exists: false,
1668            }],
1669        };
1670
1671        let err = meta
1672            .builder_with_alter_kind("my_table", &alter_kind)
1673            .err()
1674            .unwrap();
1675        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1676    }
1677
1678    #[test]
1679    fn test_remove_unknown_column() {
1680        let schema = Arc::new(new_test_schema());
1681        let meta = TableMetaBuilder::empty()
1682            .schema(schema)
1683            .primary_key_indices(vec![0])
1684            .engine("engine")
1685            .next_column_id(3)
1686            .build()
1687            .unwrap();
1688
1689        let alter_kind = AlterKind::DropColumns {
1690            names: vec![String::from("unknown")],
1691        };
1692
1693        let err = meta
1694            .builder_with_alter_kind("my_table", &alter_kind)
1695            .err()
1696            .unwrap();
1697        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1698    }
1699
1700    #[test]
1701    fn test_change_unknown_column_data_type() {
1702        let schema = Arc::new(new_test_schema());
1703        let meta = TableMetaBuilder::empty()
1704            .schema(schema)
1705            .primary_key_indices(vec![0])
1706            .engine("engine")
1707            .next_column_id(3)
1708            .build()
1709            .unwrap();
1710
1711        let alter_kind = AlterKind::ModifyColumnTypes {
1712            columns: vec![ModifyColumnTypeRequest {
1713                column_name: "unknown".to_string(),
1714                target_type: ConcreteDataType::string_datatype(),
1715            }],
1716        };
1717
1718        let err = meta
1719            .builder_with_alter_kind("my_table", &alter_kind)
1720            .err()
1721            .unwrap();
1722        assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
1723    }
1724
1725    #[test]
1726    fn test_remove_key_column() {
1727        let schema = Arc::new(new_test_schema());
1728        let meta = TableMetaBuilder::empty()
1729            .schema(schema)
1730            .primary_key_indices(vec![0])
1731            .engine("engine")
1732            .next_column_id(3)
1733            .build()
1734            .unwrap();
1735
1736        // Remove column in primary key.
1737        let alter_kind = AlterKind::DropColumns {
1738            names: vec![String::from("col1")],
1739        };
1740
1741        let err = meta
1742            .builder_with_alter_kind("my_table", &alter_kind)
1743            .err()
1744            .unwrap();
1745        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1746
1747        // Remove timestamp column.
1748        let alter_kind = AlterKind::DropColumns {
1749            names: vec![String::from("ts")],
1750        };
1751
1752        let err = meta
1753            .builder_with_alter_kind("my_table", &alter_kind)
1754            .err()
1755            .unwrap();
1756        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1757    }
1758
1759    #[test]
1760    fn test_change_key_column_data_type() {
1761        let schema = Arc::new(new_test_schema());
1762        let meta = TableMetaBuilder::empty()
1763            .schema(schema)
1764            .primary_key_indices(vec![0])
1765            .engine("engine")
1766            .next_column_id(3)
1767            .build()
1768            .unwrap();
1769
1770        // Remove column in primary key.
1771        let alter_kind = AlterKind::ModifyColumnTypes {
1772            columns: vec![ModifyColumnTypeRequest {
1773                column_name: "col1".to_string(),
1774                target_type: ConcreteDataType::string_datatype(),
1775            }],
1776        };
1777
1778        let err = meta
1779            .builder_with_alter_kind("my_table", &alter_kind)
1780            .err()
1781            .unwrap();
1782        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1783
1784        // Remove timestamp column.
1785        let alter_kind = AlterKind::ModifyColumnTypes {
1786            columns: vec![ModifyColumnTypeRequest {
1787                column_name: "ts".to_string(),
1788                target_type: ConcreteDataType::string_datatype(),
1789            }],
1790        };
1791
1792        let err = meta
1793            .builder_with_alter_kind("my_table", &alter_kind)
1794            .err()
1795            .unwrap();
1796        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1797    }
1798
1799    #[test]
1800    fn test_alloc_new_column() {
1801        let schema = Arc::new(new_test_schema());
1802        let mut meta = TableMetaBuilder::empty()
1803            .schema(schema)
1804            .primary_key_indices(vec![0])
1805            .engine("engine")
1806            .next_column_id(3)
1807            .build()
1808            .unwrap();
1809        assert_eq!(3, meta.next_column_id);
1810
1811        let column_schema = ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true);
1812        let desc = meta.alloc_new_column("test_table", &column_schema).unwrap();
1813
1814        assert_eq!(4, meta.next_column_id);
1815        assert_eq!(column_schema.name, desc.name);
1816    }
1817
1818    #[test]
1819    fn test_add_columns_with_location() {
1820        let schema = Arc::new(new_test_schema());
1821        let meta = TableMetaBuilder::empty()
1822            .schema(schema)
1823            .primary_key_indices(vec![0])
1824            .engine("engine")
1825            .next_column_id(3)
1826            .build()
1827            .unwrap();
1828
1829        let new_meta = add_columns_to_meta_with_location(&meta);
1830        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1831
1832        let names: Vec<String> = new_meta
1833            .schema
1834            .column_schemas()
1835            .iter()
1836            .map(|column_schema| column_schema.name.clone())
1837            .collect();
1838        assert_eq!(
1839            &["my_tag_first", "col1", "ts", "my_field_after_ts", "col2"],
1840            &names[..]
1841        );
1842        assert_eq!(&[0, 1], &new_meta.primary_key_indices[..]);
1843        assert_eq!(&[2, 3, 4], &new_meta.value_indices[..]);
1844    }
1845
1846    #[test]
1847    fn test_modify_column_fulltext_options() {
1848        let schema = Arc::new(new_test_schema());
1849        let meta = TableMetaBuilder::empty()
1850            .schema(schema)
1851            .primary_key_indices(vec![0])
1852            .engine("engine")
1853            .next_column_id(3)
1854            .build()
1855            .unwrap();
1856
1857        let alter_kind = AlterKind::SetIndex {
1858            options: SetIndexOptions::Fulltext {
1859                column_name: "col1".to_string(),
1860                options: FulltextOptions::default(),
1861            },
1862        };
1863        let err = meta
1864            .builder_with_alter_kind("my_table", &alter_kind)
1865            .err()
1866            .unwrap();
1867        assert_eq!(
1868            "Invalid column option, column name: col1, error: FULLTEXT index only supports string type",
1869            err.to_string()
1870        );
1871
1872        // Add a string column and make it fulltext indexed
1873        let new_meta = add_columns_to_meta_with_location(&meta);
1874        assert_eq!(meta.region_numbers, new_meta.region_numbers);
1875
1876        let alter_kind = AlterKind::SetIndex {
1877            options: SetIndexOptions::Fulltext {
1878                column_name: "my_tag_first".to_string(),
1879                options: FulltextOptions {
1880                    enable: true,
1881                    analyzer: FulltextAnalyzer::Chinese,
1882                    case_sensitive: true,
1883                    backend: FulltextBackend::Bloom,
1884                },
1885            },
1886        };
1887        let new_meta = new_meta
1888            .builder_with_alter_kind("my_table", &alter_kind)
1889            .unwrap()
1890            .build()
1891            .unwrap();
1892        let column_schema = new_meta
1893            .schema
1894            .column_schema_by_name("my_tag_first")
1895            .unwrap();
1896        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1897        assert!(fulltext_options.enable);
1898        assert_eq!(
1899            datatypes::schema::FulltextAnalyzer::Chinese,
1900            fulltext_options.analyzer
1901        );
1902        assert!(fulltext_options.case_sensitive);
1903
1904        let alter_kind = AlterKind::UnsetIndex {
1905            options: UnsetIndexOptions::Fulltext {
1906                column_name: "my_tag_first".to_string(),
1907            },
1908        };
1909        let new_meta = new_meta
1910            .builder_with_alter_kind("my_table", &alter_kind)
1911            .unwrap()
1912            .build()
1913            .unwrap();
1914        let column_schema = new_meta
1915            .schema
1916            .column_schema_by_name("my_tag_first")
1917            .unwrap();
1918        let fulltext_options = column_schema.fulltext_options().unwrap().unwrap();
1919        assert!(!fulltext_options.enable);
1920    }
1921}