store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157
158    /// Partition expression serialized as a JSON string.
159    /// Compatibility behavior:
160    /// - None: no partition expr was ever set in the manifest (legacy regions).
161    /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
162    pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        f.debug_struct("RegionMetadata")
168            .field("column_metadatas", &self.column_metadatas)
169            .field("time_index", &self.time_index)
170            .field("primary_key", &self.primary_key)
171            .field("region_id", &self.region_id)
172            .field("schema_version", &self.schema_version)
173            .field("partition_expr", &self.partition_expr)
174            .finish()
175    }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182    where
183        D: Deserializer<'de>,
184    {
185        // helper internal struct for deserialization
186        #[derive(Deserialize)]
187        struct RegionMetadataWithoutSchema {
188            column_metadatas: Vec<ColumnMetadata>,
189            primary_key: Vec<ColumnId>,
190            region_id: RegionId,
191            schema_version: u64,
192            #[serde(default)]
193            primary_key_encoding: PrimaryKeyEncoding,
194            #[serde(default)]
195            partition_expr: Option<String>,
196        }
197
198        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199        let skipped =
200            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202        Ok(Self {
203            schema: skipped.schema,
204            time_index: skipped.time_index,
205            id_to_index: skipped.id_to_index,
206            column_metadatas: without_schema.column_metadatas,
207            primary_key: without_schema.primary_key,
208            region_id: without_schema.region_id,
209            schema_version: without_schema.schema_version,
210            primary_key_encoding: without_schema.primary_key_encoding,
211            partition_expr: without_schema.partition_expr,
212        })
213    }
214}
215
216impl RegionMetadata {
217    /// Decode the metadata from a JSON str.
218    pub fn from_json(s: &str) -> Result<Self> {
219        serde_json::from_str(s).context(SerdeJsonSnafu)
220    }
221
222    /// Encode the metadata to a JSON string.
223    pub fn to_json(&self) -> Result<String> {
224        serde_json::to_string(&self).context(SerdeJsonSnafu)
225    }
226
227    /// Find column by id.
228    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229        self.id_to_index
230            .get(&column_id)
231            .map(|index| &self.column_metadatas[*index])
232    }
233
234    /// Find column index by id.
235    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236        self.id_to_index.get(&column_id).copied()
237    }
238
239    /// Find column index by name.
240    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241        self.column_metadatas
242            .iter()
243            .position(|col| col.column_schema.name == column_name)
244    }
245
246    /// Returns the time index column
247    ///
248    /// # Panics
249    /// Panics if the time index column id is invalid.
250    pub fn time_index_column(&self) -> &ColumnMetadata {
251        let index = self.id_to_index[&self.time_index];
252        &self.column_metadatas[index]
253    }
254
255    /// Returns timestamp type of time index column
256    ///
257    /// # Panics
258    /// Panics if the time index column id is invalid.
259    pub fn time_index_type(&self) -> TimestampType {
260        let index = self.id_to_index[&self.time_index];
261        self.column_metadatas[index]
262            .column_schema
263            .data_type
264            .as_timestamp()
265            .unwrap()
266    }
267
268    /// Returns the position of the time index.
269    pub fn time_index_column_pos(&self) -> usize {
270        self.id_to_index[&self.time_index]
271    }
272
273    /// Returns the arrow field of the time index column.
274    pub fn time_index_field(&self) -> FieldRef {
275        let index = self.id_to_index[&self.time_index];
276        self.schema.arrow_schema().fields[index].clone()
277    }
278
279    /// Finds a column by name.
280    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281        self.schema
282            .column_index_by_name(name)
283            .map(|index| &self.column_metadatas[index])
284    }
285
286    /// Returns all primary key columns.
287    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288        // safety: RegionMetadata::validate ensures every primary key exists.
289        self.primary_key
290            .iter()
291            .map(|id| self.column_by_id(*id).unwrap())
292    }
293
294    /// Returns all field columns before projection.
295    ///
296    /// **Use with caution**. On read path where might have projection, this method
297    /// can return columns that not present in data batch.
298    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299        self.column_metadatas
300            .iter()
301            .filter(|column| column.semantic_type == SemanticType::Field)
302    }
303
304    /// Returns a column's index in primary key if it is a primary key column.
305    ///
306    /// This does a linear search.
307    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308        self.primary_key.iter().position(|id| *id == column_id)
309    }
310
311    /// Project the metadata to a new one using specified column ids.
312    ///
313    /// [RegionId] and schema version are preserved.
314    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315        // check time index
316        ensure!(
317            projection.contains(&self.time_index),
318            TimeIndexNotFoundSnafu
319        );
320
321        // prepare new indices
322        let indices_to_preserve = projection
323            .iter()
324            .map(|id| {
325                self.column_index_by_id(*id)
326                    .with_context(|| InvalidRegionRequestSnafu {
327                        region_id: self.region_id,
328                        err: format!("column id {} not found", id),
329                    })
330            })
331            .collect::<Result<Vec<_>>>()?;
332
333        // project schema
334        let projected_schema =
335            self.schema
336                .try_project(&indices_to_preserve)
337                .with_context(|_| SchemaProjectSnafu {
338                    origin_schema: self.schema.clone(),
339                    projection: projection.to_vec(),
340                })?;
341
342        // project columns, generate projected primary key and new id_to_index
343        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344        let mut projected_primary_key = vec![];
345        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346        for index in indices_to_preserve {
347            let col = self.column_metadatas[index].clone();
348            if col.semantic_type == SemanticType::Tag {
349                projected_primary_key.push(col.column_id);
350            }
351            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352            projected_column_metadatas.push(col);
353        }
354
355        Ok(RegionMetadata {
356            schema: Arc::new(projected_schema),
357            time_index: self.time_index,
358            id_to_index: projected_id_to_index,
359            column_metadatas: projected_column_metadatas,
360            primary_key: projected_primary_key,
361            region_id: self.region_id,
362            schema_version: self.schema_version,
363            primary_key_encoding: self.primary_key_encoding,
364            partition_expr: self.partition_expr.clone(),
365        })
366    }
367
368    /// Gets the column ids to be indexed by inverted index.
369    pub fn inverted_indexed_column_ids<'a>(
370        &self,
371        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372    ) -> HashSet<ColumnId> {
373        let mut inverted_index = self
374            .column_metadatas
375            .iter()
376            .filter(|column| column.column_schema.is_inverted_indexed())
377            .map(|column| column.column_id)
378            .collect::<HashSet<_>>();
379
380        for ignored in ignore_column_ids {
381            inverted_index.remove(ignored);
382        }
383
384        inverted_index
385    }
386
387    /// Checks whether the metadata is valid.
388    fn validate(&self) -> Result<()> {
389        // Id to name.
390        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
391        for col in &self.column_metadatas {
392            // Validate each column.
393            Self::validate_column_metadata(col)?;
394
395            // Check whether column id is duplicated. We already check column name
396            // is unique in `Schema` so we only check column id here.
397            ensure!(
398                !id_names.contains_key(&col.column_id),
399                InvalidMetaSnafu {
400                    reason: format!(
401                        "column {} and {} have the same column id {}",
402                        id_names[&col.column_id], col.column_schema.name, col.column_id,
403                    ),
404                }
405            );
406            id_names.insert(col.column_id, &col.column_schema.name);
407        }
408
409        // Checks there is only one time index.
410        let time_indexes = self
411            .column_metadatas
412            .iter()
413            .filter(|col| col.semantic_type == SemanticType::Timestamp)
414            .collect::<Vec<_>>();
415        ensure!(
416            time_indexes.len() == 1,
417            InvalidMetaSnafu {
418                reason: format!(
419                    "expect only one time index, found {}: {}",
420                    time_indexes.len(),
421                    time_indexes
422                        .iter()
423                        .map(|c| &c.column_schema.name)
424                        .join(", ")
425                ),
426            }
427        );
428
429        // Checks the time index column is not nullable.
430        ensure!(
431            !self.time_index_column().column_schema.is_nullable(),
432            InvalidMetaSnafu {
433                reason: format!(
434                    "time index column {} must be NOT NULL",
435                    self.time_index_column().column_schema.name
436                ),
437            }
438        );
439
440        if !self.primary_key.is_empty() {
441            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
442            // Checks column ids in the primary key is valid.
443            for column_id in &self.primary_key {
444                // Checks whether the column id exists.
445                ensure!(
446                    id_names.contains_key(column_id),
447                    InvalidMetaSnafu {
448                        reason: format!("unknown column id {}", column_id),
449                    }
450                );
451
452                // Safety: Column with specific id must exist.
453                let column = self.column_by_id(*column_id).unwrap();
454                // Checks duplicate.
455                ensure!(
456                    !pk_ids.contains(&column_id),
457                    InvalidMetaSnafu {
458                        reason: format!(
459                            "duplicate column {} in primary key",
460                            column.column_schema.name
461                        ),
462                    }
463                );
464
465                // Checks this is not a time index column.
466                ensure!(
467                    *column_id != self.time_index,
468                    InvalidMetaSnafu {
469                        reason: format!(
470                            "column {} is already a time index column",
471                            column.column_schema.name,
472                        ),
473                    }
474                );
475
476                // Checks semantic type.
477                ensure!(
478                    column.semantic_type == SemanticType::Tag,
479                    InvalidMetaSnafu {
480                        reason: format!(
481                            "semantic type of column {} should be Tag, not {:?}",
482                            column.column_schema.name, column.semantic_type
483                        ),
484                    }
485                );
486
487                pk_ids.insert(column_id);
488            }
489        }
490
491        // Checks tag semantic type.
492        let num_tag = self
493            .column_metadatas
494            .iter()
495            .filter(|col| col.semantic_type == SemanticType::Tag)
496            .count();
497        ensure!(
498            num_tag == self.primary_key.len(),
499            InvalidMetaSnafu {
500                reason: format!(
501                    "number of primary key columns {} not equal to tag columns {}",
502                    self.primary_key.len(),
503                    num_tag
504                ),
505            }
506        );
507
508        Ok(())
509    }
510
511    /// Checks whether it is a valid column.
512    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
513        if column_metadata.semantic_type == SemanticType::Timestamp {
514            ensure!(
515                column_metadata.column_schema.data_type.is_timestamp(),
516                InvalidMetaSnafu {
517                    reason: format!(
518                        "column `{}` is not timestamp type",
519                        column_metadata.column_schema.name
520                    ),
521                }
522            );
523        }
524
525        ensure!(
526            !is_internal_column(&column_metadata.column_schema.name),
527            InvalidMetaSnafu {
528                reason: format!(
529                    "{} is internal column name that can not be used",
530                    column_metadata.column_schema.name
531                ),
532            }
533        );
534
535        Ok(())
536    }
537}
538
539/// Builder to build [RegionMetadata].
540pub struct RegionMetadataBuilder {
541    region_id: RegionId,
542    column_metadatas: Vec<ColumnMetadata>,
543    primary_key: Vec<ColumnId>,
544    schema_version: u64,
545    primary_key_encoding: PrimaryKeyEncoding,
546    partition_expr: Option<String>,
547}
548
549impl RegionMetadataBuilder {
550    /// Returns a new builder.
551    pub fn new(id: RegionId) -> Self {
552        Self {
553            region_id: id,
554            column_metadatas: vec![],
555            primary_key: vec![],
556            schema_version: 0,
557            primary_key_encoding: PrimaryKeyEncoding::Dense,
558            partition_expr: None,
559        }
560    }
561
562    /// Creates a builder from existing [RegionMetadata].
563    pub fn from_existing(existing: RegionMetadata) -> Self {
564        Self {
565            column_metadatas: existing.column_metadatas,
566            primary_key: existing.primary_key,
567            region_id: existing.region_id,
568            schema_version: existing.schema_version,
569            primary_key_encoding: existing.primary_key_encoding,
570            partition_expr: existing.partition_expr,
571        }
572    }
573
574    /// Sets the primary key encoding mode.
575    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
576        self.primary_key_encoding = encoding;
577        self
578    }
579
580    /// Sets the partition expression in JSON string form.
581    pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
582        self.partition_expr = expr_json;
583        self
584    }
585
586    /// Pushes a new column metadata to this region's metadata.
587    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
588        self.column_metadatas.push(column_metadata);
589        self
590    }
591
592    /// Sets the primary key of the region.
593    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
594        self.primary_key = key;
595        self
596    }
597
598    /// Increases the schema version by 1.
599    pub fn bump_version(&mut self) -> &mut Self {
600        self.schema_version += 1;
601        self
602    }
603
604    /// Applies the alter `kind` to the builder.
605    ///
606    /// The `kind` should be valid.
607    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
608        match kind {
609            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
610            AlterKind::DropColumns { names } => self.drop_columns(&names),
611            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
612            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
613            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
614            AlterKind::SetRegionOptions { options: _ } => {
615                // nothing to be done with RegionMetadata
616            }
617            AlterKind::UnsetRegionOptions { keys: _ } => {
618                // nothing to be done with RegionMetadata
619            }
620            AlterKind::DropDefaults { names } => {
621                self.drop_defaults(names)?;
622            }
623            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
624            AlterKind::SyncColumns { column_metadatas } => {
625                self.primary_key = column_metadatas
626                    .iter()
627                    .filter_map(|column_metadata| {
628                        if column_metadata.semantic_type == SemanticType::Tag {
629                            Some(column_metadata.column_id)
630                        } else {
631                            None
632                        }
633                    })
634                    .collect::<Vec<_>>();
635                self.column_metadatas = column_metadatas;
636            }
637        }
638        Ok(self)
639    }
640
641    /// Consumes the builder and build a [RegionMetadata].
642    pub fn build(self) -> Result<RegionMetadata> {
643        let skipped = SkippedFields::new(&self.column_metadatas)?;
644
645        let meta = RegionMetadata {
646            schema: skipped.schema,
647            time_index: skipped.time_index,
648            id_to_index: skipped.id_to_index,
649            column_metadatas: self.column_metadatas,
650            primary_key: self.primary_key,
651            region_id: self.region_id,
652            schema_version: self.schema_version,
653            primary_key_encoding: self.primary_key_encoding,
654            partition_expr: self.partition_expr,
655        };
656
657        meta.validate()?;
658
659        Ok(meta)
660    }
661
662    /// Adds columns to the metadata if not exist.
663    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
664        let mut names: HashSet<_> = self
665            .column_metadatas
666            .iter()
667            .map(|col| col.column_schema.name.clone())
668            .collect();
669
670        for add_column in columns {
671            if names.contains(&add_column.column_metadata.column_schema.name) {
672                // Column already exists.
673                continue;
674            }
675
676            let column_id = add_column.column_metadata.column_id;
677            let semantic_type = add_column.column_metadata.semantic_type;
678            let column_name = add_column.column_metadata.column_schema.name.clone();
679            match add_column.location {
680                None => {
681                    self.column_metadatas.push(add_column.column_metadata);
682                }
683                Some(AddColumnLocation::First) => {
684                    self.column_metadatas.insert(0, add_column.column_metadata);
685                }
686                Some(AddColumnLocation::After { column_name }) => {
687                    let pos = self
688                        .column_metadatas
689                        .iter()
690                        .position(|col| col.column_schema.name == column_name)
691                        .context(InvalidRegionRequestSnafu {
692                            region_id: self.region_id,
693                            err: format!(
694                                "column {} not found, failed to add column {} after it",
695                                column_name, add_column.column_metadata.column_schema.name
696                            ),
697                        })?;
698                    // Insert after pos.
699                    self.column_metadatas
700                        .insert(pos + 1, add_column.column_metadata);
701                }
702            }
703            names.insert(column_name);
704            if semantic_type == SemanticType::Tag {
705                // For a new tag, we extend the primary key.
706                self.primary_key.push(column_id);
707            }
708        }
709
710        Ok(())
711    }
712
713    /// Drops columns from the metadata if exist.
714    fn drop_columns(&mut self, names: &[String]) {
715        let name_set: HashSet<_> = names.iter().collect();
716        self.column_metadatas
717            .retain(|col| !name_set.contains(&col.column_schema.name));
718    }
719
720    /// Changes columns type to the metadata if exist.
721    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
722        let mut change_type_map: HashMap<_, _> = columns
723            .into_iter()
724            .map(
725                |ModifyColumnType {
726                     column_name,
727                     target_type,
728                 }| (column_name, target_type),
729            )
730            .collect();
731
732        for column_meta in self.column_metadatas.iter_mut() {
733            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
734                column_meta.column_schema.data_type = target_type.clone();
735                // also cast default value to target_type if default value exist
736                let new_default =
737                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
738                        Some(
739                            default_value
740                                .cast_to_datatype(&target_type)
741                                .with_context(|_| CastDefaultValueSnafu {
742                                    reason: format!(
743                                        "Failed to cast default value from {:?} to type {:?}",
744                                        default_value, target_type
745                                    ),
746                                })?,
747                        )
748                    } else {
749                        None
750                    };
751                column_meta.column_schema = column_meta
752                    .column_schema
753                    .clone()
754                    .with_default_constraint(new_default.clone())
755                    .with_context(|_| CastDefaultValueSnafu {
756                        reason: format!("Failed to set new default: {:?}", new_default),
757                    })?;
758            }
759        }
760
761        Ok(())
762    }
763
764    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
765        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
766        for option in &options {
767            set_index_map
768                .entry(option.column_name())
769                .or_default()
770                .push(option);
771        }
772
773        for column_metadata in self.column_metadatas.iter_mut() {
774            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
775                for option in options {
776                    Self::set_index(column_metadata, option)?;
777                }
778            }
779        }
780
781        Ok(())
782    }
783
784    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
785        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
786        for option in &options {
787            unset_index_map
788                .entry(option.column_name())
789                .or_default()
790                .push(option);
791        }
792
793        for column_metadata in self.column_metadatas.iter_mut() {
794            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
795                for option in options {
796                    Self::unset_index(column_metadata, option)?;
797                }
798            }
799        }
800
801        Ok(())
802    }
803
804    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
805        match options {
806            SetIndexOption::Fulltext {
807                column_name,
808                options,
809            } => {
810                ensure!(
811                    column_metadata.column_schema.data_type.is_string(),
812                    InvalidColumnOptionSnafu {
813                        column_name,
814                        msg: "FULLTEXT index only supports string type".to_string(),
815                    }
816                );
817                let current_fulltext_options = column_metadata
818                    .column_schema
819                    .fulltext_options()
820                    .with_context(|_| GetFulltextOptionsSnafu {
821                        column_name: column_name.to_string(),
822                    })?;
823                set_column_fulltext_options(
824                    column_metadata,
825                    column_name,
826                    options,
827                    current_fulltext_options,
828                )?;
829            }
830            SetIndexOption::Inverted { .. } => {
831                column_metadata.column_schema.set_inverted_index(true)
832            }
833            SetIndexOption::Skipping {
834                column_name,
835                options,
836            } => {
837                column_metadata
838                    .column_schema
839                    .set_skipping_options(options)
840                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
841            }
842        }
843
844        Ok(())
845    }
846
847    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
848        match options {
849            UnsetIndexOption::Fulltext { column_name } => {
850                ensure!(
851                    column_metadata.column_schema.data_type.is_string(),
852                    InvalidColumnOptionSnafu {
853                        column_name,
854                        msg: "FULLTEXT index only supports string type".to_string(),
855                    }
856                );
857
858                let current_fulltext_options = column_metadata
859                    .column_schema
860                    .fulltext_options()
861                    .with_context(|_| GetFulltextOptionsSnafu {
862                        column_name: column_name.to_string(),
863                    })?;
864
865                unset_column_fulltext_options(
866                    column_metadata,
867                    column_name,
868                    current_fulltext_options,
869                )?;
870            }
871            UnsetIndexOption::Inverted { .. } => {
872                column_metadata.column_schema.set_inverted_index(false)
873            }
874            UnsetIndexOption::Skipping { column_name } => {
875                column_metadata
876                    .column_schema
877                    .unset_skipping_options()
878                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
879            }
880        }
881
882        Ok(())
883    }
884
885    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
886        for name in column_names.iter() {
887            let meta = self
888                .column_metadatas
889                .iter_mut()
890                .find(|col| col.column_schema.name == *name);
891            if let Some(meta) = meta {
892                if !meta.column_schema.is_nullable() {
893                    return InvalidRegionRequestSnafu {
894                        region_id: self.region_id,
895                        err: format!(
896                            "column {name} is not nullable and `default` cannot be dropped",
897                        ),
898                    }
899                    .fail();
900                }
901                meta.column_schema = meta
902                    .column_schema
903                    .clone()
904                    .with_default_constraint(None)
905                    .with_context(|_| CastDefaultValueSnafu {
906                        reason: format!("Failed to drop default : {name:?}"),
907                    })?;
908            } else {
909                return InvalidRegionRequestSnafu {
910                    region_id: self.region_id,
911                    err: format!("column {name} not found",),
912                }
913                .fail();
914            }
915        }
916        Ok(())
917    }
918
919    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
920        for set_default in set_defaults.iter() {
921            let meta = self
922                .column_metadatas
923                .iter_mut()
924                .find(|col| col.column_schema.name == set_default.name);
925            if let Some(meta) = meta {
926                let default_constraint = common_sql::convert::deserialize_default_constraint(
927                    set_default.default_constraint.as_slice(),
928                    &meta.column_schema.name,
929                    &meta.column_schema.data_type,
930                )
931                .context(SqlCommonSnafu)?;
932
933                meta.column_schema = meta
934                    .column_schema
935                    .clone()
936                    .with_default_constraint(default_constraint)
937                    .with_context(|_| CastDefaultValueSnafu {
938                        reason: format!("Failed to set default : {set_default:?}"),
939                    })?;
940            } else {
941                return InvalidRegionRequestSnafu {
942                    region_id: self.region_id,
943                    err: format!("column {} not found", set_default.name),
944                }
945                .fail();
946            }
947        }
948        Ok(())
949    }
950}
951
952/// Fields skipped in serialization.
953struct SkippedFields {
954    /// Last schema.
955    schema: SchemaRef,
956    /// Id of the time index column.
957    time_index: ColumnId,
958    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
959    id_to_index: HashMap<ColumnId, usize>,
960}
961
962impl SkippedFields {
963    /// Constructs skipped fields from `column_metadatas`.
964    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
965        let column_schemas = column_metadatas
966            .iter()
967            .map(|column_metadata| column_metadata.column_schema.clone())
968            .collect();
969        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
970        let time_index = column_metadatas
971            .iter()
972            .find_map(|col| {
973                if col.semantic_type == SemanticType::Timestamp {
974                    Some(col.column_id)
975                } else {
976                    None
977                }
978            })
979            .context(InvalidMetaSnafu {
980                reason: "time index not found",
981            })?;
982        let id_to_index = column_metadatas
983            .iter()
984            .enumerate()
985            .map(|(idx, col)| (col.column_id, idx))
986            .collect();
987
988        Ok(SkippedFields {
989            schema,
990            time_index,
991            id_to_index,
992        })
993    }
994}
995
996#[derive(Snafu)]
997#[snafu(visibility(pub))]
998#[stack_trace_debug]
999pub enum MetadataError {
1000    #[snafu(display("Invalid schema"))]
1001    InvalidSchema {
1002        source: datatypes::error::Error,
1003        #[snafu(implicit)]
1004        location: Location,
1005    },
1006
1007    #[snafu(display("Invalid metadata, {}", reason))]
1008    InvalidMeta {
1009        reason: String,
1010        #[snafu(implicit)]
1011        location: Location,
1012    },
1013
1014    #[snafu(display("Failed to ser/de json object"))]
1015    SerdeJson {
1016        #[snafu(implicit)]
1017        location: Location,
1018        #[snafu(source)]
1019        error: serde_json::Error,
1020    },
1021
1022    #[snafu(display("Invalid raw region request, err: {}", err))]
1023    InvalidRawRegionRequest {
1024        err: String,
1025        #[snafu(implicit)]
1026        location: Location,
1027    },
1028
1029    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1030    InvalidRegionRequest {
1031        region_id: RegionId,
1032        err: String,
1033        #[snafu(implicit)]
1034        location: Location,
1035    },
1036
1037    #[snafu(display("Unexpected schema error during project"))]
1038    SchemaProject {
1039        origin_schema: SchemaRef,
1040        projection: Vec<ColumnId>,
1041        #[snafu(implicit)]
1042        location: Location,
1043        source: datatypes::Error,
1044    },
1045
1046    #[snafu(display("Time index column not found"))]
1047    TimeIndexNotFound {
1048        #[snafu(implicit)]
1049        location: Location,
1050    },
1051
1052    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1053    ChangeColumnNotFound {
1054        column_name: String,
1055        region_id: RegionId,
1056        #[snafu(implicit)]
1057        location: Location,
1058    },
1059
1060    #[snafu(display("Failed to convert column schema"))]
1061    ConvertColumnSchema {
1062        source: api::error::Error,
1063        #[snafu(implicit)]
1064        location: Location,
1065    },
1066
1067    #[snafu(display("Failed to convert TimeRanges"))]
1068    ConvertTimeRanges {
1069        source: api::error::Error,
1070        #[snafu(implicit)]
1071        location: Location,
1072    },
1073
1074    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1075    InvalidSetRegionOptionRequest {
1076        key: String,
1077        value: String,
1078        #[snafu(implicit)]
1079        location: Location,
1080    },
1081
1082    #[snafu(display("Invalid set region option request, key: {}", key))]
1083    InvalidUnsetRegionOptionRequest {
1084        key: String,
1085        #[snafu(implicit)]
1086        location: Location,
1087    },
1088
1089    #[snafu(display("Failed to decode protobuf"))]
1090    DecodeProto {
1091        #[snafu(source)]
1092        error: prost::UnknownEnumValue,
1093        #[snafu(implicit)]
1094        location: Location,
1095    },
1096
1097    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1098    InvalidColumnOption {
1099        column_name: String,
1100        msg: String,
1101        #[snafu(implicit)]
1102        location: Location,
1103    },
1104
1105    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1106    SetFulltextOptions {
1107        column_name: String,
1108        source: datatypes::Error,
1109        #[snafu(implicit)]
1110        location: Location,
1111    },
1112
1113    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1114    GetFulltextOptions {
1115        column_name: String,
1116        source: datatypes::Error,
1117        #[snafu(implicit)]
1118        location: Location,
1119    },
1120
1121    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1122    SetSkippingIndexOptions {
1123        column_name: String,
1124        source: datatypes::Error,
1125        #[snafu(implicit)]
1126        location: Location,
1127    },
1128
1129    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1130    UnsetSkippingIndexOptions {
1131        column_name: String,
1132        source: datatypes::Error,
1133        #[snafu(implicit)]
1134        location: Location,
1135    },
1136
1137    #[snafu(display("Failed to decode arrow ipc record batches"))]
1138    DecodeArrowIpc {
1139        #[snafu(source)]
1140        error: arrow::error::ArrowError,
1141        #[snafu(implicit)]
1142        location: Location,
1143    },
1144
1145    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1146    CastDefaultValue {
1147        reason: String,
1148        source: datatypes::Error,
1149        #[snafu(implicit)]
1150        location: Location,
1151    },
1152
1153    #[snafu(display("Unexpected: {}", reason))]
1154    Unexpected {
1155        reason: String,
1156        #[snafu(implicit)]
1157        location: Location,
1158    },
1159
1160    #[snafu(display("Failed to encode/decode flight message"))]
1161    FlightCodec {
1162        source: common_grpc::Error,
1163        #[snafu(implicit)]
1164        location: Location,
1165    },
1166
1167    #[snafu(display("Invalid index option"))]
1168    InvalidIndexOption {
1169        #[snafu(implicit)]
1170        location: Location,
1171        #[snafu(source)]
1172        error: datatypes::error::Error,
1173    },
1174
1175    #[snafu(display("Sql common error"))]
1176    SqlCommon {
1177        source: common_sql::error::Error,
1178        #[snafu(implicit)]
1179        location: Location,
1180    },
1181}
1182
1183impl ErrorExt for MetadataError {
1184    fn status_code(&self) -> StatusCode {
1185        match self {
1186            Self::SqlCommon { source, .. } => source.status_code(),
1187            _ => StatusCode::InvalidArguments,
1188        }
1189    }
1190
1191    fn as_any(&self) -> &dyn Any {
1192        self
1193    }
1194}
1195
1196/// Set column fulltext options if it passed the validation.
1197///
1198/// Options allowed to modify:
1199/// * backend
1200///
1201/// Options not allowed to modify:
1202/// * analyzer
1203/// * case_sensitive
1204fn set_column_fulltext_options(
1205    column_meta: &mut ColumnMetadata,
1206    column_name: &str,
1207    options: &FulltextOptions,
1208    current_options: Option<FulltextOptions>,
1209) -> Result<()> {
1210    if let Some(current_options) = current_options {
1211        ensure!(
1212            current_options.analyzer == options.analyzer
1213                && current_options.case_sensitive == options.case_sensitive,
1214            InvalidColumnOptionSnafu {
1215                column_name,
1216                msg: format!(
1217                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1218                    current_options.analyzer, current_options.case_sensitive
1219                ),
1220            }
1221        );
1222    }
1223
1224    column_meta
1225        .column_schema
1226        .set_fulltext_options(options)
1227        .context(SetFulltextOptionsSnafu { column_name })?;
1228
1229    Ok(())
1230}
1231
1232fn unset_column_fulltext_options(
1233    column_meta: &mut ColumnMetadata,
1234    column_name: &str,
1235    current_options: Option<FulltextOptions>,
1236) -> Result<()> {
1237    if let Some(mut current_options) = current_options
1238        && current_options.enable
1239    {
1240        current_options.enable = false;
1241        column_meta
1242            .column_schema
1243            .set_fulltext_options(&current_options)
1244            .context(SetFulltextOptionsSnafu { column_name })?;
1245    } else {
1246        return InvalidColumnOptionSnafu {
1247            column_name,
1248            msg: "FULLTEXT index already disabled",
1249        }
1250        .fail();
1251    }
1252
1253    Ok(())
1254}
1255
1256#[cfg(test)]
1257mod test {
1258    use datatypes::prelude::ConcreteDataType;
1259    use datatypes::schema::{
1260        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1261    };
1262    use datatypes::value::Value;
1263
1264    use super::*;
1265
1266    fn create_builder() -> RegionMetadataBuilder {
1267        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1268    }
1269
1270    fn build_test_region_metadata() -> RegionMetadata {
1271        let mut builder = create_builder();
1272        builder
1273            .push_column_metadata(ColumnMetadata {
1274                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1275                semantic_type: SemanticType::Tag,
1276                column_id: 1,
1277            })
1278            .push_column_metadata(ColumnMetadata {
1279                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1280                semantic_type: SemanticType::Field,
1281                column_id: 2,
1282            })
1283            .push_column_metadata(ColumnMetadata {
1284                column_schema: ColumnSchema::new(
1285                    "c",
1286                    ConcreteDataType::timestamp_millisecond_datatype(),
1287                    false,
1288                ),
1289                semantic_type: SemanticType::Timestamp,
1290                column_id: 3,
1291            })
1292            .primary_key(vec![1])
1293            .partition_expr_json(Some("".to_string()));
1294        builder.build().unwrap()
1295    }
1296
1297    #[test]
1298    fn test_region_metadata() {
1299        let region_metadata = build_test_region_metadata();
1300        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1301        assert_eq!(
1302            "a",
1303            region_metadata.column_by_id(1).unwrap().column_schema.name
1304        );
1305        assert_eq!(None, region_metadata.column_by_id(10));
1306    }
1307
1308    #[test]
1309    fn test_region_metadata_serde() {
1310        let region_metadata = build_test_region_metadata();
1311        let serialized = serde_json::to_string(&region_metadata).unwrap();
1312        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1313        assert_eq!(region_metadata, deserialized);
1314    }
1315
1316    #[test]
1317    fn test_column_metadata_validate() {
1318        let mut builder = create_builder();
1319        let col = ColumnMetadata {
1320            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1321            semantic_type: SemanticType::Timestamp,
1322            column_id: 1,
1323        };
1324
1325        builder.push_column_metadata(col);
1326        let err = builder.build().unwrap_err();
1327        assert!(
1328            err.to_string()
1329                .contains("column `ts` is not timestamp type"),
1330            "unexpected err: {err}",
1331        );
1332    }
1333
1334    #[test]
1335    fn test_empty_region_metadata() {
1336        let builder = create_builder();
1337        let err = builder.build().unwrap_err();
1338        // A region must have a time index.
1339        assert!(
1340            err.to_string().contains("time index not found"),
1341            "unexpected err: {err}",
1342        );
1343    }
1344
1345    #[test]
1346    fn test_same_column_id() {
1347        let mut builder = create_builder();
1348        builder
1349            .push_column_metadata(ColumnMetadata {
1350                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1351                semantic_type: SemanticType::Tag,
1352                column_id: 1,
1353            })
1354            .push_column_metadata(ColumnMetadata {
1355                column_schema: ColumnSchema::new(
1356                    "b",
1357                    ConcreteDataType::timestamp_millisecond_datatype(),
1358                    false,
1359                ),
1360                semantic_type: SemanticType::Timestamp,
1361                column_id: 1,
1362            });
1363        let err = builder.build().unwrap_err();
1364        assert!(
1365            err.to_string()
1366                .contains("column a and b have the same column id"),
1367            "unexpected err: {err}",
1368        );
1369    }
1370
1371    #[test]
1372    fn test_duplicate_time_index() {
1373        let mut builder = create_builder();
1374        builder
1375            .push_column_metadata(ColumnMetadata {
1376                column_schema: ColumnSchema::new(
1377                    "a",
1378                    ConcreteDataType::timestamp_millisecond_datatype(),
1379                    false,
1380                ),
1381                semantic_type: SemanticType::Timestamp,
1382                column_id: 1,
1383            })
1384            .push_column_metadata(ColumnMetadata {
1385                column_schema: ColumnSchema::new(
1386                    "b",
1387                    ConcreteDataType::timestamp_millisecond_datatype(),
1388                    false,
1389                ),
1390                semantic_type: SemanticType::Timestamp,
1391                column_id: 2,
1392            });
1393        let err = builder.build().unwrap_err();
1394        assert!(
1395            err.to_string().contains("expect only one time index"),
1396            "unexpected err: {err}",
1397        );
1398    }
1399
1400    #[test]
1401    fn test_unknown_primary_key() {
1402        let mut builder = create_builder();
1403        builder
1404            .push_column_metadata(ColumnMetadata {
1405                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1406                semantic_type: SemanticType::Tag,
1407                column_id: 1,
1408            })
1409            .push_column_metadata(ColumnMetadata {
1410                column_schema: ColumnSchema::new(
1411                    "b",
1412                    ConcreteDataType::timestamp_millisecond_datatype(),
1413                    false,
1414                ),
1415                semantic_type: SemanticType::Timestamp,
1416                column_id: 2,
1417            })
1418            .primary_key(vec![3]);
1419        let err = builder.build().unwrap_err();
1420        assert!(
1421            err.to_string().contains("unknown column id 3"),
1422            "unexpected err: {err}",
1423        );
1424    }
1425
1426    #[test]
1427    fn test_same_primary_key() {
1428        let mut builder = create_builder();
1429        builder
1430            .push_column_metadata(ColumnMetadata {
1431                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1432                semantic_type: SemanticType::Tag,
1433                column_id: 1,
1434            })
1435            .push_column_metadata(ColumnMetadata {
1436                column_schema: ColumnSchema::new(
1437                    "b",
1438                    ConcreteDataType::timestamp_millisecond_datatype(),
1439                    false,
1440                ),
1441                semantic_type: SemanticType::Timestamp,
1442                column_id: 2,
1443            })
1444            .primary_key(vec![1, 1]);
1445        let err = builder.build().unwrap_err();
1446        assert!(
1447            err.to_string()
1448                .contains("duplicate column a in primary key"),
1449            "unexpected err: {err}",
1450        );
1451    }
1452
1453    #[test]
1454    fn test_in_time_index() {
1455        let mut builder = create_builder();
1456        builder
1457            .push_column_metadata(ColumnMetadata {
1458                column_schema: ColumnSchema::new(
1459                    "ts",
1460                    ConcreteDataType::timestamp_millisecond_datatype(),
1461                    false,
1462                ),
1463                semantic_type: SemanticType::Timestamp,
1464                column_id: 1,
1465            })
1466            .primary_key(vec![1]);
1467        let err = builder.build().unwrap_err();
1468        assert!(
1469            err.to_string()
1470                .contains("column ts is already a time index column"),
1471            "unexpected err: {err}",
1472        );
1473    }
1474
1475    #[test]
1476    fn test_nullable_time_index() {
1477        let mut builder = create_builder();
1478        builder.push_column_metadata(ColumnMetadata {
1479            column_schema: ColumnSchema::new(
1480                "ts",
1481                ConcreteDataType::timestamp_millisecond_datatype(),
1482                true,
1483            ),
1484            semantic_type: SemanticType::Timestamp,
1485            column_id: 1,
1486        });
1487        let err = builder.build().unwrap_err();
1488        assert!(
1489            err.to_string()
1490                .contains("time index column ts must be NOT NULL"),
1491            "unexpected err: {err}",
1492        );
1493    }
1494
1495    #[test]
1496    fn test_primary_key_semantic_type() {
1497        let mut builder = create_builder();
1498        builder
1499            .push_column_metadata(ColumnMetadata {
1500                column_schema: ColumnSchema::new(
1501                    "ts",
1502                    ConcreteDataType::timestamp_millisecond_datatype(),
1503                    false,
1504                ),
1505                semantic_type: SemanticType::Timestamp,
1506                column_id: 1,
1507            })
1508            .push_column_metadata(ColumnMetadata {
1509                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1510                semantic_type: SemanticType::Field,
1511                column_id: 2,
1512            })
1513            .primary_key(vec![2]);
1514        let err = builder.build().unwrap_err();
1515        assert!(
1516            err.to_string()
1517                .contains("semantic type of column a should be Tag, not Field"),
1518            "unexpected err: {err}",
1519        );
1520    }
1521
1522    #[test]
1523    fn test_primary_key_tag_num() {
1524        let mut builder = create_builder();
1525        builder
1526            .push_column_metadata(ColumnMetadata {
1527                column_schema: ColumnSchema::new(
1528                    "ts",
1529                    ConcreteDataType::timestamp_millisecond_datatype(),
1530                    false,
1531                ),
1532                semantic_type: SemanticType::Timestamp,
1533                column_id: 1,
1534            })
1535            .push_column_metadata(ColumnMetadata {
1536                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1537                semantic_type: SemanticType::Tag,
1538                column_id: 2,
1539            })
1540            .push_column_metadata(ColumnMetadata {
1541                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1542                semantic_type: SemanticType::Tag,
1543                column_id: 3,
1544            })
1545            .primary_key(vec![2]);
1546        let err = builder.build().unwrap_err();
1547        assert!(
1548            err.to_string()
1549                .contains("number of primary key columns 1 not equal to tag columns 2"),
1550            "unexpected err: {err}",
1551        );
1552    }
1553
1554    #[test]
1555    fn test_bump_version() {
1556        let mut region_metadata = build_test_region_metadata();
1557        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1558        builder.bump_version();
1559        let new_meta = builder.build().unwrap();
1560        region_metadata.schema_version += 1;
1561        assert_eq!(region_metadata, new_meta);
1562    }
1563
1564    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1565        let semantic_type = if is_tag {
1566            SemanticType::Tag
1567        } else {
1568            SemanticType::Field
1569        };
1570        ColumnMetadata {
1571            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1572            semantic_type,
1573            column_id,
1574        }
1575    }
1576
1577    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1578        let actual: Vec<_> = metadata
1579            .column_metadatas
1580            .iter()
1581            .map(|col| &col.column_schema.name)
1582            .collect();
1583        assert_eq!(names, actual);
1584    }
1585
1586    fn get_columns_default_constraint(
1587        metadata: &RegionMetadata,
1588        name: String,
1589    ) -> Option<Option<&ColumnDefaultConstraint>> {
1590        metadata.column_metadatas.iter().find_map(|col| {
1591            if col.column_schema.name == name {
1592                Some(col.column_schema.default_constraint())
1593            } else {
1594                None
1595            }
1596        })
1597    }
1598
1599    #[test]
1600    fn test_alter() {
1601        // a (tag), b (field), c (ts)
1602        let metadata = build_test_region_metadata();
1603        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1604        // tag d
1605        builder
1606            .alter(AlterKind::AddColumns {
1607                columns: vec![AddColumn {
1608                    column_metadata: new_column_metadata("d", true, 4),
1609                    location: None,
1610                }],
1611            })
1612            .unwrap();
1613        let metadata = builder.build().unwrap();
1614        check_columns(&metadata, &["a", "b", "c", "d"]);
1615        assert_eq!([1, 4], &metadata.primary_key[..]);
1616
1617        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1618        builder
1619            .alter(AlterKind::AddColumns {
1620                columns: vec![AddColumn {
1621                    column_metadata: new_column_metadata("e", false, 5),
1622                    location: Some(AddColumnLocation::First),
1623                }],
1624            })
1625            .unwrap();
1626        let metadata = builder.build().unwrap();
1627        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1628
1629        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1630        builder
1631            .alter(AlterKind::AddColumns {
1632                columns: vec![AddColumn {
1633                    column_metadata: new_column_metadata("f", false, 6),
1634                    location: Some(AddColumnLocation::After {
1635                        column_name: "b".to_string(),
1636                    }),
1637                }],
1638            })
1639            .unwrap();
1640        let metadata = builder.build().unwrap();
1641        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1642
1643        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1644        builder
1645            .alter(AlterKind::AddColumns {
1646                columns: vec![AddColumn {
1647                    column_metadata: new_column_metadata("g", false, 7),
1648                    location: Some(AddColumnLocation::After {
1649                        column_name: "d".to_string(),
1650                    }),
1651                }],
1652            })
1653            .unwrap();
1654        let metadata = builder.build().unwrap();
1655        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1656
1657        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1658        builder
1659            .alter(AlterKind::DropColumns {
1660                names: vec!["g".to_string(), "e".to_string()],
1661            })
1662            .unwrap();
1663        let metadata = builder.build().unwrap();
1664        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1665
1666        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1667        builder
1668            .alter(AlterKind::DropColumns {
1669                names: vec!["a".to_string()],
1670            })
1671            .unwrap();
1672        // Build returns error as the primary key contains a.
1673        let err = builder.build().unwrap_err();
1674        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1675
1676        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1677        let mut column_metadata = new_column_metadata("g", false, 8);
1678        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1679        column_metadata.column_schema = column_metadata
1680            .column_schema
1681            .with_default_constraint(default_constraint.clone())
1682            .unwrap();
1683        builder
1684            .alter(AlterKind::AddColumns {
1685                columns: vec![AddColumn {
1686                    column_metadata,
1687                    location: None,
1688                }],
1689            })
1690            .unwrap();
1691        let metadata = builder.build().unwrap();
1692        assert_eq!(
1693            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1694            default_constraint.as_ref()
1695        );
1696        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1697
1698        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1699        builder
1700            .alter(AlterKind::DropDefaults {
1701                names: vec!["g".to_string()],
1702            })
1703            .unwrap();
1704        let metadata = builder.build().unwrap();
1705        assert_eq!(
1706            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1707            None
1708        );
1709        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1710
1711        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1712        builder
1713            .alter(AlterKind::DropColumns {
1714                names: vec!["g".to_string()],
1715            })
1716            .unwrap();
1717        let metadata = builder.build().unwrap();
1718        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1719
1720        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1721        builder
1722            .alter(AlterKind::ModifyColumnTypes {
1723                columns: vec![ModifyColumnType {
1724                    column_name: "b".to_string(),
1725                    target_type: ConcreteDataType::string_datatype(),
1726                }],
1727            })
1728            .unwrap();
1729        let metadata = builder.build().unwrap();
1730        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1731        let b_type = &metadata
1732            .column_by_name("b")
1733            .unwrap()
1734            .column_schema
1735            .data_type;
1736        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1737
1738        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1739        builder
1740            .alter(AlterKind::SetIndexes {
1741                options: vec![SetIndexOption::Fulltext {
1742                    column_name: "b".to_string(),
1743                    options: FulltextOptions::new_unchecked(
1744                        true,
1745                        FulltextAnalyzer::Chinese,
1746                        true,
1747                        FulltextBackend::Bloom,
1748                        1000,
1749                        0.01,
1750                    ),
1751                }],
1752            })
1753            .unwrap();
1754        let metadata = builder.build().unwrap();
1755        let a_fulltext_options = metadata
1756            .column_by_name("b")
1757            .unwrap()
1758            .column_schema
1759            .fulltext_options()
1760            .unwrap()
1761            .unwrap();
1762        assert!(a_fulltext_options.enable);
1763        assert_eq!(
1764            datatypes::schema::FulltextAnalyzer::Chinese,
1765            a_fulltext_options.analyzer
1766        );
1767        assert!(a_fulltext_options.case_sensitive);
1768
1769        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1770        builder
1771            .alter(AlterKind::UnsetIndexes {
1772                options: vec![UnsetIndexOption::Fulltext {
1773                    column_name: "b".to_string(),
1774                }],
1775            })
1776            .unwrap();
1777        let metadata = builder.build().unwrap();
1778        let a_fulltext_options = metadata
1779            .column_by_name("b")
1780            .unwrap()
1781            .column_schema
1782            .fulltext_options()
1783            .unwrap()
1784            .unwrap();
1785        assert!(!a_fulltext_options.enable);
1786        assert_eq!(
1787            datatypes::schema::FulltextAnalyzer::Chinese,
1788            a_fulltext_options.analyzer
1789        );
1790        assert!(a_fulltext_options.case_sensitive);
1791    }
1792
1793    #[test]
1794    fn test_add_if_not_exists() {
1795        // a (tag), b (field), c (ts)
1796        let metadata = build_test_region_metadata();
1797        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1798        // tag d
1799        builder
1800            .alter(AlterKind::AddColumns {
1801                columns: vec![
1802                    AddColumn {
1803                        column_metadata: new_column_metadata("d", true, 4),
1804                        location: None,
1805                    },
1806                    AddColumn {
1807                        column_metadata: new_column_metadata("d", true, 4),
1808                        location: None,
1809                    },
1810                ],
1811            })
1812            .unwrap();
1813        let metadata = builder.build().unwrap();
1814        check_columns(&metadata, &["a", "b", "c", "d"]);
1815        assert_eq!([1, 4], &metadata.primary_key[..]);
1816
1817        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1818        // field b.
1819        builder
1820            .alter(AlterKind::AddColumns {
1821                columns: vec![AddColumn {
1822                    column_metadata: new_column_metadata("b", false, 2),
1823                    location: None,
1824                }],
1825            })
1826            .unwrap();
1827        let metadata = builder.build().unwrap();
1828        check_columns(&metadata, &["a", "b", "c", "d"]);
1829    }
1830
1831    #[test]
1832    fn test_add_column_with_inverted_index() {
1833        // only set inverted index to true explicitly will this column be inverted indexed
1834
1835        // a (tag), b (field), c (ts)
1836        let metadata = build_test_region_metadata();
1837        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1838        // tag d, e
1839        let mut col = new_column_metadata("d", true, 4);
1840        col.column_schema.set_inverted_index(true);
1841        builder
1842            .alter(AlterKind::AddColumns {
1843                columns: vec![
1844                    AddColumn {
1845                        column_metadata: col,
1846                        location: None,
1847                    },
1848                    AddColumn {
1849                        column_metadata: new_column_metadata("e", true, 5),
1850                        location: None,
1851                    },
1852                ],
1853            })
1854            .unwrap();
1855        let metadata = builder.build().unwrap();
1856        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1857        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1858        let column_metadata = metadata.column_by_name("a").unwrap();
1859        assert!(!column_metadata.column_schema.is_inverted_indexed());
1860        let column_metadata = metadata.column_by_name("b").unwrap();
1861        assert!(!column_metadata.column_schema.is_inverted_indexed());
1862        let column_metadata = metadata.column_by_name("c").unwrap();
1863        assert!(!column_metadata.column_schema.is_inverted_indexed());
1864        let column_metadata = metadata.column_by_name("d").unwrap();
1865        assert!(column_metadata.column_schema.is_inverted_indexed());
1866        let column_metadata = metadata.column_by_name("e").unwrap();
1867        assert!(!column_metadata.column_schema.is_inverted_indexed());
1868    }
1869
1870    #[test]
1871    fn test_drop_if_exists() {
1872        // a (tag), b (field), c (ts)
1873        let metadata = build_test_region_metadata();
1874        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1875        // field d, e
1876        builder
1877            .alter(AlterKind::AddColumns {
1878                columns: vec![
1879                    AddColumn {
1880                        column_metadata: new_column_metadata("d", false, 4),
1881                        location: None,
1882                    },
1883                    AddColumn {
1884                        column_metadata: new_column_metadata("e", false, 5),
1885                        location: None,
1886                    },
1887                ],
1888            })
1889            .unwrap();
1890        let metadata = builder.build().unwrap();
1891        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1892
1893        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1894        builder
1895            .alter(AlterKind::DropColumns {
1896                names: vec!["b".to_string(), "b".to_string()],
1897            })
1898            .unwrap();
1899        let metadata = builder.build().unwrap();
1900        check_columns(&metadata, &["a", "c", "d", "e"]);
1901
1902        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1903        builder
1904            .alter(AlterKind::DropColumns {
1905                names: vec!["b".to_string(), "e".to_string()],
1906            })
1907            .unwrap();
1908        let metadata = builder.build().unwrap();
1909        check_columns(&metadata, &["a", "c", "d"]);
1910    }
1911
1912    #[test]
1913    fn test_invalid_column_name() {
1914        let mut builder = create_builder();
1915        builder.push_column_metadata(ColumnMetadata {
1916            column_schema: ColumnSchema::new(
1917                "__sequence",
1918                ConcreteDataType::timestamp_millisecond_datatype(),
1919                false,
1920            ),
1921            semantic_type: SemanticType::Timestamp,
1922            column_id: 1,
1923        });
1924        let err = builder.build().unwrap_err();
1925        assert!(
1926            err.to_string()
1927                .contains("internal column name that can not be used"),
1928            "unexpected err: {err}",
1929        );
1930    }
1931
1932    #[test]
1933    fn test_debug_for_column_metadata() {
1934        let region_metadata = build_test_region_metadata();
1935        let formatted = format!("{:?}", region_metadata);
1936        assert_eq!(
1937            formatted,
1938            "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
1939        );
1940    }
1941
1942    #[test]
1943    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1944        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1945        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1946        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1947
1948        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1949        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1950        assert_eq!(
1951            deserialized.primary_key_encoding,
1952            PrimaryKeyEncoding::Sparse
1953        );
1954    }
1955}