store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
33use datatypes::types::TimestampType;
34use serde::de::Error;
35use serde::{Deserialize, Deserializer, Serialize};
36use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
37
38use crate::codec::PrimaryKeyEncoding;
39use crate::region_request::{
40    AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, ApiUnsetIndexOptions,
41    ModifyColumnType,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157}
158
159impl fmt::Debug for RegionMetadata {
160    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
161        f.debug_struct("RegionMetadata")
162            .field("column_metadatas", &self.column_metadatas)
163            .field("time_index", &self.time_index)
164            .field("primary_key", &self.primary_key)
165            .field("region_id", &self.region_id)
166            .field("schema_version", &self.schema_version)
167            .finish()
168    }
169}
170
171pub type RegionMetadataRef = Arc<RegionMetadata>;
172
173impl<'de> Deserialize<'de> for RegionMetadata {
174    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
175    where
176        D: Deserializer<'de>,
177    {
178        // helper internal struct for deserialization
179        #[derive(Deserialize)]
180        struct RegionMetadataWithoutSchema {
181            column_metadatas: Vec<ColumnMetadata>,
182            primary_key: Vec<ColumnId>,
183            region_id: RegionId,
184            schema_version: u64,
185            #[serde(default)]
186            primary_key_encoding: PrimaryKeyEncoding,
187        }
188
189        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
190        let skipped =
191            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
192
193        Ok(Self {
194            schema: skipped.schema,
195            time_index: skipped.time_index,
196            id_to_index: skipped.id_to_index,
197            column_metadatas: without_schema.column_metadatas,
198            primary_key: without_schema.primary_key,
199            region_id: without_schema.region_id,
200            schema_version: without_schema.schema_version,
201            primary_key_encoding: without_schema.primary_key_encoding,
202        })
203    }
204}
205
206impl RegionMetadata {
207    /// Decode the metadata from a JSON str.
208    pub fn from_json(s: &str) -> Result<Self> {
209        serde_json::from_str(s).context(SerdeJsonSnafu)
210    }
211
212    /// Encode the metadata to a JSON string.
213    pub fn to_json(&self) -> Result<String> {
214        serde_json::to_string(&self).context(SerdeJsonSnafu)
215    }
216
217    /// Find column by id.
218    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
219        self.id_to_index
220            .get(&column_id)
221            .map(|index| &self.column_metadatas[*index])
222    }
223
224    /// Find column index by id.
225    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
226        self.id_to_index.get(&column_id).copied()
227    }
228
229    /// Find column index by name.
230    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
231        self.column_metadatas
232            .iter()
233            .position(|col| col.column_schema.name == column_name)
234    }
235
236    /// Returns the time index column
237    ///
238    /// # Panics
239    /// Panics if the time index column id is invalid.
240    pub fn time_index_column(&self) -> &ColumnMetadata {
241        let index = self.id_to_index[&self.time_index];
242        &self.column_metadatas[index]
243    }
244
245    /// Returns timestamp type of time index column
246    ///
247    /// # Panics
248    /// Panics if the time index column id is invalid.
249    pub fn time_index_type(&self) -> TimestampType {
250        let index = self.id_to_index[&self.time_index];
251        self.column_metadatas[index]
252            .column_schema
253            .data_type
254            .as_timestamp()
255            .unwrap()
256    }
257
258    /// Returns the position of the time index.
259    pub fn time_index_column_pos(&self) -> usize {
260        self.id_to_index[&self.time_index]
261    }
262
263    /// Returns the arrow field of the time index column.
264    pub fn time_index_field(&self) -> FieldRef {
265        let index = self.id_to_index[&self.time_index];
266        self.schema.arrow_schema().fields[index].clone()
267    }
268
269    /// Finds a column by name.
270    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
271        self.schema
272            .column_index_by_name(name)
273            .map(|index| &self.column_metadatas[index])
274    }
275
276    /// Returns all primary key columns.
277    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
278        // safety: RegionMetadata::validate ensures every primary key exists.
279        self.primary_key
280            .iter()
281            .map(|id| self.column_by_id(*id).unwrap())
282    }
283
284    /// Returns all field columns before projection.
285    ///
286    /// **Use with caution**. On read path where might have projection, this method
287    /// can return columns that not present in data batch.
288    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
289        self.column_metadatas
290            .iter()
291            .filter(|column| column.semantic_type == SemanticType::Field)
292    }
293
294    /// Returns a column's index in primary key if it is a primary key column.
295    ///
296    /// This does a linear search.
297    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
298        self.primary_key.iter().position(|id| *id == column_id)
299    }
300
301    /// Project the metadata to a new one using specified column ids.
302    ///
303    /// [RegionId] and schema version are preserved.
304    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
305        // check time index
306        ensure!(
307            projection.contains(&self.time_index),
308            TimeIndexNotFoundSnafu
309        );
310
311        // prepare new indices
312        let indices_to_preserve = projection
313            .iter()
314            .map(|id| {
315                self.column_index_by_id(*id)
316                    .with_context(|| InvalidRegionRequestSnafu {
317                        region_id: self.region_id,
318                        err: format!("column id {} not found", id),
319                    })
320            })
321            .collect::<Result<Vec<_>>>()?;
322
323        // project schema
324        let projected_schema =
325            self.schema
326                .try_project(&indices_to_preserve)
327                .with_context(|_| SchemaProjectSnafu {
328                    origin_schema: self.schema.clone(),
329                    projection: projection.to_vec(),
330                })?;
331
332        // project columns, generate projected primary key and new id_to_index
333        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
334        let mut projected_primary_key = vec![];
335        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
336        for index in indices_to_preserve {
337            let col = self.column_metadatas[index].clone();
338            if col.semantic_type == SemanticType::Tag {
339                projected_primary_key.push(col.column_id);
340            }
341            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
342            projected_column_metadatas.push(col);
343        }
344
345        Ok(RegionMetadata {
346            schema: Arc::new(projected_schema),
347            time_index: self.time_index,
348            id_to_index: projected_id_to_index,
349            column_metadatas: projected_column_metadatas,
350            primary_key: projected_primary_key,
351            region_id: self.region_id,
352            schema_version: self.schema_version,
353            primary_key_encoding: self.primary_key_encoding,
354        })
355    }
356
357    /// Gets the column ids to be indexed by inverted index.
358    pub fn inverted_indexed_column_ids<'a>(
359        &self,
360        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
361    ) -> HashSet<ColumnId> {
362        let mut inverted_index = self
363            .column_metadatas
364            .iter()
365            .filter(|column| column.column_schema.is_inverted_indexed())
366            .map(|column| column.column_id)
367            .collect::<HashSet<_>>();
368
369        for ignored in ignore_column_ids {
370            inverted_index.remove(ignored);
371        }
372
373        inverted_index
374    }
375
376    /// Checks whether the metadata is valid.
377    fn validate(&self) -> Result<()> {
378        // Id to name.
379        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
380        for col in &self.column_metadatas {
381            // Validate each column.
382            Self::validate_column_metadata(col)?;
383
384            // Check whether column id is duplicated. We already check column name
385            // is unique in `Schema` so we only check column id here.
386            ensure!(
387                !id_names.contains_key(&col.column_id),
388                InvalidMetaSnafu {
389                    reason: format!(
390                        "column {} and {} have the same column id {}",
391                        id_names[&col.column_id], col.column_schema.name, col.column_id,
392                    ),
393                }
394            );
395            id_names.insert(col.column_id, &col.column_schema.name);
396        }
397
398        // Checks there is only one time index.
399        let num_time_index = self
400            .column_metadatas
401            .iter()
402            .filter(|col| col.semantic_type == SemanticType::Timestamp)
403            .count();
404        ensure!(
405            num_time_index == 1,
406            InvalidMetaSnafu {
407                reason: format!("expect only one time index, found {}", num_time_index),
408            }
409        );
410
411        // Checks the time index column is not nullable.
412        ensure!(
413            !self.time_index_column().column_schema.is_nullable(),
414            InvalidMetaSnafu {
415                reason: format!(
416                    "time index column {} must be NOT NULL",
417                    self.time_index_column().column_schema.name
418                ),
419            }
420        );
421
422        if !self.primary_key.is_empty() {
423            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
424            // Checks column ids in the primary key is valid.
425            for column_id in &self.primary_key {
426                // Checks whether the column id exists.
427                ensure!(
428                    id_names.contains_key(column_id),
429                    InvalidMetaSnafu {
430                        reason: format!("unknown column id {}", column_id),
431                    }
432                );
433
434                // Safety: Column with specific id must exist.
435                let column = self.column_by_id(*column_id).unwrap();
436                // Checks duplicate.
437                ensure!(
438                    !pk_ids.contains(&column_id),
439                    InvalidMetaSnafu {
440                        reason: format!(
441                            "duplicate column {} in primary key",
442                            column.column_schema.name
443                        ),
444                    }
445                );
446
447                // Checks this is not a time index column.
448                ensure!(
449                    *column_id != self.time_index,
450                    InvalidMetaSnafu {
451                        reason: format!(
452                            "column {} is already a time index column",
453                            column.column_schema.name,
454                        ),
455                    }
456                );
457
458                // Checks semantic type.
459                ensure!(
460                    column.semantic_type == SemanticType::Tag,
461                    InvalidMetaSnafu {
462                        reason: format!(
463                            "semantic type of column {} should be Tag, not {:?}",
464                            column.column_schema.name, column.semantic_type
465                        ),
466                    }
467                );
468
469                pk_ids.insert(column_id);
470            }
471        }
472
473        // Checks tag semantic type.
474        let num_tag = self
475            .column_metadatas
476            .iter()
477            .filter(|col| col.semantic_type == SemanticType::Tag)
478            .count();
479        ensure!(
480            num_tag == self.primary_key.len(),
481            InvalidMetaSnafu {
482                reason: format!(
483                    "number of primary key columns {} not equal to tag columns {}",
484                    self.primary_key.len(),
485                    num_tag
486                ),
487            }
488        );
489
490        Ok(())
491    }
492
493    /// Checks whether it is a valid column.
494    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
495        if column_metadata.semantic_type == SemanticType::Timestamp {
496            ensure!(
497                column_metadata.column_schema.data_type.is_timestamp(),
498                InvalidMetaSnafu {
499                    reason: format!(
500                        "column `{}` is not timestamp type",
501                        column_metadata.column_schema.name
502                    ),
503                }
504            );
505        }
506
507        ensure!(
508            !is_internal_column(&column_metadata.column_schema.name),
509            InvalidMetaSnafu {
510                reason: format!(
511                    "{} is internal column name that can not be used",
512                    column_metadata.column_schema.name
513                ),
514            }
515        );
516
517        Ok(())
518    }
519}
520
521/// Builder to build [RegionMetadata].
522pub struct RegionMetadataBuilder {
523    region_id: RegionId,
524    column_metadatas: Vec<ColumnMetadata>,
525    primary_key: Vec<ColumnId>,
526    schema_version: u64,
527    primary_key_encoding: PrimaryKeyEncoding,
528}
529
530impl RegionMetadataBuilder {
531    /// Returns a new builder.
532    pub fn new(id: RegionId) -> Self {
533        Self {
534            region_id: id,
535            column_metadatas: vec![],
536            primary_key: vec![],
537            schema_version: 0,
538            primary_key_encoding: PrimaryKeyEncoding::Dense,
539        }
540    }
541
542    /// Creates a builder from existing [RegionMetadata].
543    pub fn from_existing(existing: RegionMetadata) -> Self {
544        Self {
545            column_metadatas: existing.column_metadatas,
546            primary_key: existing.primary_key,
547            region_id: existing.region_id,
548            schema_version: existing.schema_version,
549            primary_key_encoding: existing.primary_key_encoding,
550        }
551    }
552
553    /// Sets the primary key encoding mode.
554    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
555        self.primary_key_encoding = encoding;
556        self
557    }
558
559    /// Pushes a new column metadata to this region's metadata.
560    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
561        self.column_metadatas.push(column_metadata);
562        self
563    }
564
565    /// Sets the primary key of the region.
566    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
567        self.primary_key = key;
568        self
569    }
570
571    /// Increases the schema version by 1.
572    pub fn bump_version(&mut self) -> &mut Self {
573        self.schema_version += 1;
574        self
575    }
576
577    /// Applies the alter `kind` to the builder.
578    ///
579    /// The `kind` should be valid.
580    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
581        match kind {
582            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
583            AlterKind::DropColumns { names } => self.drop_columns(&names),
584            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
585            AlterKind::SetIndex { options } => match options {
586                ApiSetIndexOptions::Fulltext {
587                    column_name,
588                    options,
589                } => self.change_column_fulltext_options(column_name, true, Some(options))?,
590                ApiSetIndexOptions::Inverted { column_name } => {
591                    self.change_column_inverted_index_options(column_name, true)?
592                }
593                ApiSetIndexOptions::Skipping {
594                    column_name,
595                    options,
596                } => self.change_column_skipping_index_options(column_name, Some(options))?,
597            },
598            AlterKind::UnsetIndex { options } => match options {
599                ApiUnsetIndexOptions::Fulltext { column_name } => {
600                    self.change_column_fulltext_options(column_name, false, None)?
601                }
602                ApiUnsetIndexOptions::Inverted { column_name } => {
603                    self.change_column_inverted_index_options(column_name, false)?
604                }
605                ApiUnsetIndexOptions::Skipping { column_name } => {
606                    self.change_column_skipping_index_options(column_name, None)?
607                }
608            },
609            AlterKind::SetRegionOptions { options: _ } => {
610                // nothing to be done with RegionMetadata
611            }
612            AlterKind::UnsetRegionOptions { keys: _ } => {
613                // nothing to be done with RegionMetadata
614            }
615        }
616        Ok(self)
617    }
618
619    /// Consumes the builder and build a [RegionMetadata].
620    pub fn build(self) -> Result<RegionMetadata> {
621        let skipped = SkippedFields::new(&self.column_metadatas)?;
622
623        let meta = RegionMetadata {
624            schema: skipped.schema,
625            time_index: skipped.time_index,
626            id_to_index: skipped.id_to_index,
627            column_metadatas: self.column_metadatas,
628            primary_key: self.primary_key,
629            region_id: self.region_id,
630            schema_version: self.schema_version,
631            primary_key_encoding: self.primary_key_encoding,
632        };
633
634        meta.validate()?;
635
636        Ok(meta)
637    }
638
639    /// Adds columns to the metadata if not exist.
640    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
641        let mut names: HashSet<_> = self
642            .column_metadatas
643            .iter()
644            .map(|col| col.column_schema.name.clone())
645            .collect();
646
647        for add_column in columns {
648            if names.contains(&add_column.column_metadata.column_schema.name) {
649                // Column already exists.
650                continue;
651            }
652
653            let column_id = add_column.column_metadata.column_id;
654            let semantic_type = add_column.column_metadata.semantic_type;
655            let column_name = add_column.column_metadata.column_schema.name.clone();
656            match add_column.location {
657                None => {
658                    self.column_metadatas.push(add_column.column_metadata);
659                }
660                Some(AddColumnLocation::First) => {
661                    self.column_metadatas.insert(0, add_column.column_metadata);
662                }
663                Some(AddColumnLocation::After { column_name }) => {
664                    let pos = self
665                        .column_metadatas
666                        .iter()
667                        .position(|col| col.column_schema.name == column_name)
668                        .context(InvalidRegionRequestSnafu {
669                            region_id: self.region_id,
670                            err: format!(
671                                "column {} not found, failed to add column {} after it",
672                                column_name, add_column.column_metadata.column_schema.name
673                            ),
674                        })?;
675                    // Insert after pos.
676                    self.column_metadatas
677                        .insert(pos + 1, add_column.column_metadata);
678                }
679            }
680            names.insert(column_name);
681            if semantic_type == SemanticType::Tag {
682                // For a new tag, we extend the primary key.
683                self.primary_key.push(column_id);
684            }
685        }
686
687        Ok(())
688    }
689
690    /// Drops columns from the metadata if exist.
691    fn drop_columns(&mut self, names: &[String]) {
692        let name_set: HashSet<_> = names.iter().collect();
693        self.column_metadatas
694            .retain(|col| !name_set.contains(&col.column_schema.name));
695    }
696
697    /// Changes columns type to the metadata if exist.
698    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
699        let mut change_type_map: HashMap<_, _> = columns
700            .into_iter()
701            .map(
702                |ModifyColumnType {
703                     column_name,
704                     target_type,
705                 }| (column_name, target_type),
706            )
707            .collect();
708
709        for column_meta in self.column_metadatas.iter_mut() {
710            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
711                column_meta.column_schema.data_type = target_type.clone();
712                // also cast default value to target_type if default value exist
713                let new_default =
714                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
715                        Some(
716                            default_value
717                                .cast_to_datatype(&target_type)
718                                .with_context(|_| CastDefaultValueSnafu {
719                                    reason: format!(
720                                        "Failed to cast default value from {:?} to type {:?}",
721                                        default_value, target_type
722                                    ),
723                                })?,
724                        )
725                    } else {
726                        None
727                    };
728                column_meta.column_schema = column_meta
729                    .column_schema
730                    .clone()
731                    .with_default_constraint(new_default.clone())
732                    .with_context(|_| CastDefaultValueSnafu {
733                        reason: format!("Failed to set new default: {:?}", new_default),
734                    })?;
735            }
736        }
737
738        Ok(())
739    }
740
741    fn change_column_inverted_index_options(
742        &mut self,
743        column_name: String,
744        value: bool,
745    ) -> Result<()> {
746        for column_meta in self.column_metadatas.iter_mut() {
747            if column_meta.column_schema.name == column_name {
748                column_meta.column_schema.set_inverted_index(value)
749            }
750        }
751        Ok(())
752    }
753
754    fn change_column_fulltext_options(
755        &mut self,
756        column_name: String,
757        enable: bool,
758        options: Option<FulltextOptions>,
759    ) -> Result<()> {
760        for column_meta in self.column_metadatas.iter_mut() {
761            if column_meta.column_schema.name == column_name {
762                ensure!(
763                    column_meta.column_schema.data_type.is_string(),
764                    InvalidColumnOptionSnafu {
765                        column_name,
766                        msg: "FULLTEXT index only supports string type".to_string(),
767                    }
768                );
769
770                let current_fulltext_options = column_meta
771                    .column_schema
772                    .fulltext_options()
773                    .context(SetFulltextOptionsSnafu {
774                        column_name: column_name.clone(),
775                    })?;
776
777                if enable {
778                    ensure!(
779                        options.is_some(),
780                        InvalidColumnOptionSnafu {
781                            column_name,
782                            msg: "FULLTEXT index options must be provided",
783                        }
784                    );
785                    set_column_fulltext_options(
786                        column_meta,
787                        column_name,
788                        options.unwrap(),
789                        current_fulltext_options,
790                    )?;
791                } else {
792                    unset_column_fulltext_options(
793                        column_meta,
794                        column_name,
795                        current_fulltext_options,
796                    )?;
797                }
798                break;
799            }
800        }
801        Ok(())
802    }
803
804    fn change_column_skipping_index_options(
805        &mut self,
806        column_name: String,
807        options: Option<SkippingIndexOptions>,
808    ) -> Result<()> {
809        for column_meta in self.column_metadatas.iter_mut() {
810            if column_meta.column_schema.name == column_name {
811                if let Some(options) = &options {
812                    column_meta
813                        .column_schema
814                        .set_skipping_options(options)
815                        .context(UnsetSkippingIndexOptionsSnafu {
816                            column_name: column_name.clone(),
817                        })?;
818                } else {
819                    column_meta.column_schema.unset_skipping_options().context(
820                        UnsetSkippingIndexOptionsSnafu {
821                            column_name: column_name.clone(),
822                        },
823                    )?;
824                }
825            }
826        }
827        Ok(())
828    }
829}
830
831/// Fields skipped in serialization.
832struct SkippedFields {
833    /// Last schema.
834    schema: SchemaRef,
835    /// Id of the time index column.
836    time_index: ColumnId,
837    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
838    id_to_index: HashMap<ColumnId, usize>,
839}
840
841impl SkippedFields {
842    /// Constructs skipped fields from `column_metadatas`.
843    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
844        let column_schemas = column_metadatas
845            .iter()
846            .map(|column_metadata| column_metadata.column_schema.clone())
847            .collect();
848        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
849        let time_index = column_metadatas
850            .iter()
851            .find_map(|col| {
852                if col.semantic_type == SemanticType::Timestamp {
853                    Some(col.column_id)
854                } else {
855                    None
856                }
857            })
858            .context(InvalidMetaSnafu {
859                reason: "time index not found",
860            })?;
861        let id_to_index = column_metadatas
862            .iter()
863            .enumerate()
864            .map(|(idx, col)| (col.column_id, idx))
865            .collect();
866
867        Ok(SkippedFields {
868            schema,
869            time_index,
870            id_to_index,
871        })
872    }
873}
874
875#[derive(Snafu)]
876#[snafu(visibility(pub))]
877#[stack_trace_debug]
878pub enum MetadataError {
879    #[snafu(display("Invalid schema"))]
880    InvalidSchema {
881        source: datatypes::error::Error,
882        #[snafu(implicit)]
883        location: Location,
884    },
885
886    #[snafu(display("Invalid metadata, {}", reason))]
887    InvalidMeta {
888        reason: String,
889        #[snafu(implicit)]
890        location: Location,
891    },
892
893    #[snafu(display("Failed to ser/de json object"))]
894    SerdeJson {
895        #[snafu(implicit)]
896        location: Location,
897        #[snafu(source)]
898        error: serde_json::Error,
899    },
900
901    #[snafu(display("Invalid raw region request, err: {}", err))]
902    InvalidRawRegionRequest {
903        err: String,
904        #[snafu(implicit)]
905        location: Location,
906    },
907
908    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
909    InvalidRegionRequest {
910        region_id: RegionId,
911        err: String,
912        #[snafu(implicit)]
913        location: Location,
914    },
915
916    #[snafu(display("Unexpected schema error during project"))]
917    SchemaProject {
918        origin_schema: SchemaRef,
919        projection: Vec<ColumnId>,
920        #[snafu(implicit)]
921        location: Location,
922        source: datatypes::Error,
923    },
924
925    #[snafu(display("Time index column not found"))]
926    TimeIndexNotFound {
927        #[snafu(implicit)]
928        location: Location,
929    },
930
931    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
932    ChangeColumnNotFound {
933        column_name: String,
934        region_id: RegionId,
935        #[snafu(implicit)]
936        location: Location,
937    },
938
939    #[snafu(display("Failed to convert column schema"))]
940    ConvertColumnSchema {
941        source: api::error::Error,
942        #[snafu(implicit)]
943        location: Location,
944    },
945
946    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
947    InvalidSetRegionOptionRequest {
948        key: String,
949        value: String,
950        #[snafu(implicit)]
951        location: Location,
952    },
953
954    #[snafu(display("Invalid set region option request, key: {}", key))]
955    InvalidUnsetRegionOptionRequest {
956        key: String,
957        #[snafu(implicit)]
958        location: Location,
959    },
960
961    #[snafu(display("Failed to decode protobuf"))]
962    DecodeProto {
963        #[snafu(source)]
964        error: prost::UnknownEnumValue,
965        #[snafu(implicit)]
966        location: Location,
967    },
968
969    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
970    InvalidColumnOption {
971        column_name: String,
972        msg: String,
973        #[snafu(implicit)]
974        location: Location,
975    },
976
977    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
978    SetFulltextOptions {
979        column_name: String,
980        source: datatypes::Error,
981        #[snafu(implicit)]
982        location: Location,
983    },
984
985    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
986    SetSkippingIndexOptions {
987        column_name: String,
988        source: datatypes::Error,
989        #[snafu(implicit)]
990        location: Location,
991    },
992
993    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
994    UnsetSkippingIndexOptions {
995        column_name: String,
996        source: datatypes::Error,
997        #[snafu(implicit)]
998        location: Location,
999    },
1000
1001    #[snafu(display("Failed to decode arrow ipc record batches"))]
1002    DecodeArrowIpc {
1003        #[snafu(source)]
1004        error: arrow::error::ArrowError,
1005        #[snafu(implicit)]
1006        location: Location,
1007    },
1008
1009    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1010    CastDefaultValue {
1011        reason: String,
1012        source: datatypes::Error,
1013        #[snafu(implicit)]
1014        location: Location,
1015    },
1016
1017    #[snafu(display("Unexpected: {}", reason))]
1018    Unexpected {
1019        reason: String,
1020        #[snafu(implicit)]
1021        location: Location,
1022    },
1023
1024    #[snafu(display("Failed to encode/decode flight message"))]
1025    FlightCodec {
1026        source: common_grpc::Error,
1027        #[snafu(implicit)]
1028        location: Location,
1029    },
1030
1031    #[snafu(display("Failed to decode prost message"))]
1032    Prost {
1033        #[snafu(source)]
1034        error: prost::DecodeError,
1035        #[snafu(implicit)]
1036        location: Location,
1037    },
1038}
1039
1040impl ErrorExt for MetadataError {
1041    fn status_code(&self) -> StatusCode {
1042        StatusCode::InvalidArguments
1043    }
1044
1045    fn as_any(&self) -> &dyn Any {
1046        self
1047    }
1048}
1049
1050/// Set column fulltext options if it passed the validation.
1051///
1052/// Options allowed to modify:
1053/// * backend
1054///
1055/// Options not allowed to modify:
1056/// * analyzer
1057/// * case_sensitive
1058fn set_column_fulltext_options(
1059    column_meta: &mut ColumnMetadata,
1060    column_name: String,
1061    options: FulltextOptions,
1062    current_options: Option<FulltextOptions>,
1063) -> Result<()> {
1064    if let Some(current_options) = current_options {
1065        ensure!(
1066            current_options.analyzer == options.analyzer
1067                && current_options.case_sensitive == options.case_sensitive,
1068            InvalidColumnOptionSnafu {
1069                column_name,
1070                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1071                current_options.analyzer, current_options.case_sensitive),
1072            }
1073        );
1074    }
1075
1076    column_meta
1077        .column_schema
1078        .set_fulltext_options(&options)
1079        .context(SetFulltextOptionsSnafu { column_name })?;
1080
1081    Ok(())
1082}
1083
1084fn unset_column_fulltext_options(
1085    column_meta: &mut ColumnMetadata,
1086    column_name: String,
1087    current_options: Option<FulltextOptions>,
1088) -> Result<()> {
1089    if let Some(mut current_options) = current_options
1090        && current_options.enable
1091    {
1092        current_options.enable = false;
1093        column_meta
1094            .column_schema
1095            .set_fulltext_options(&current_options)
1096            .context(SetFulltextOptionsSnafu { column_name })?;
1097    } else {
1098        return InvalidColumnOptionSnafu {
1099            column_name,
1100            msg: "FULLTEXT index already disabled",
1101        }
1102        .fail();
1103    }
1104
1105    Ok(())
1106}
1107
1108#[cfg(test)]
1109mod test {
1110    use datatypes::prelude::ConcreteDataType;
1111    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1112
1113    use super::*;
1114
1115    fn create_builder() -> RegionMetadataBuilder {
1116        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1117    }
1118
1119    fn build_test_region_metadata() -> RegionMetadata {
1120        let mut builder = create_builder();
1121        builder
1122            .push_column_metadata(ColumnMetadata {
1123                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1124                semantic_type: SemanticType::Tag,
1125                column_id: 1,
1126            })
1127            .push_column_metadata(ColumnMetadata {
1128                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1129                semantic_type: SemanticType::Field,
1130                column_id: 2,
1131            })
1132            .push_column_metadata(ColumnMetadata {
1133                column_schema: ColumnSchema::new(
1134                    "c",
1135                    ConcreteDataType::timestamp_millisecond_datatype(),
1136                    false,
1137                ),
1138                semantic_type: SemanticType::Timestamp,
1139                column_id: 3,
1140            })
1141            .primary_key(vec![1]);
1142        builder.build().unwrap()
1143    }
1144
1145    #[test]
1146    fn test_region_metadata() {
1147        let region_metadata = build_test_region_metadata();
1148        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1149        assert_eq!(
1150            "a",
1151            region_metadata.column_by_id(1).unwrap().column_schema.name
1152        );
1153        assert_eq!(None, region_metadata.column_by_id(10));
1154    }
1155
1156    #[test]
1157    fn test_region_metadata_serde() {
1158        let region_metadata = build_test_region_metadata();
1159        let serialized = serde_json::to_string(&region_metadata).unwrap();
1160        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1161        assert_eq!(region_metadata, deserialized);
1162    }
1163
1164    #[test]
1165    fn test_column_metadata_validate() {
1166        let mut builder = create_builder();
1167        let col = ColumnMetadata {
1168            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1169            semantic_type: SemanticType::Timestamp,
1170            column_id: 1,
1171        };
1172
1173        builder.push_column_metadata(col);
1174        let err = builder.build().unwrap_err();
1175        assert!(
1176            err.to_string()
1177                .contains("column `ts` is not timestamp type"),
1178            "unexpected err: {err}",
1179        );
1180    }
1181
1182    #[test]
1183    fn test_empty_region_metadata() {
1184        let builder = create_builder();
1185        let err = builder.build().unwrap_err();
1186        // A region must have a time index.
1187        assert!(
1188            err.to_string().contains("time index not found"),
1189            "unexpected err: {err}",
1190        );
1191    }
1192
1193    #[test]
1194    fn test_same_column_id() {
1195        let mut builder = create_builder();
1196        builder
1197            .push_column_metadata(ColumnMetadata {
1198                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1199                semantic_type: SemanticType::Tag,
1200                column_id: 1,
1201            })
1202            .push_column_metadata(ColumnMetadata {
1203                column_schema: ColumnSchema::new(
1204                    "b",
1205                    ConcreteDataType::timestamp_millisecond_datatype(),
1206                    false,
1207                ),
1208                semantic_type: SemanticType::Timestamp,
1209                column_id: 1,
1210            });
1211        let err = builder.build().unwrap_err();
1212        assert!(
1213            err.to_string()
1214                .contains("column a and b have the same column id"),
1215            "unexpected err: {err}",
1216        );
1217    }
1218
1219    #[test]
1220    fn test_duplicate_time_index() {
1221        let mut builder = create_builder();
1222        builder
1223            .push_column_metadata(ColumnMetadata {
1224                column_schema: ColumnSchema::new(
1225                    "a",
1226                    ConcreteDataType::timestamp_millisecond_datatype(),
1227                    false,
1228                ),
1229                semantic_type: SemanticType::Timestamp,
1230                column_id: 1,
1231            })
1232            .push_column_metadata(ColumnMetadata {
1233                column_schema: ColumnSchema::new(
1234                    "b",
1235                    ConcreteDataType::timestamp_millisecond_datatype(),
1236                    false,
1237                ),
1238                semantic_type: SemanticType::Timestamp,
1239                column_id: 2,
1240            });
1241        let err = builder.build().unwrap_err();
1242        assert!(
1243            err.to_string().contains("expect only one time index"),
1244            "unexpected err: {err}",
1245        );
1246    }
1247
1248    #[test]
1249    fn test_unknown_primary_key() {
1250        let mut builder = create_builder();
1251        builder
1252            .push_column_metadata(ColumnMetadata {
1253                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1254                semantic_type: SemanticType::Tag,
1255                column_id: 1,
1256            })
1257            .push_column_metadata(ColumnMetadata {
1258                column_schema: ColumnSchema::new(
1259                    "b",
1260                    ConcreteDataType::timestamp_millisecond_datatype(),
1261                    false,
1262                ),
1263                semantic_type: SemanticType::Timestamp,
1264                column_id: 2,
1265            })
1266            .primary_key(vec![3]);
1267        let err = builder.build().unwrap_err();
1268        assert!(
1269            err.to_string().contains("unknown column id 3"),
1270            "unexpected err: {err}",
1271        );
1272    }
1273
1274    #[test]
1275    fn test_same_primary_key() {
1276        let mut builder = create_builder();
1277        builder
1278            .push_column_metadata(ColumnMetadata {
1279                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1280                semantic_type: SemanticType::Tag,
1281                column_id: 1,
1282            })
1283            .push_column_metadata(ColumnMetadata {
1284                column_schema: ColumnSchema::new(
1285                    "b",
1286                    ConcreteDataType::timestamp_millisecond_datatype(),
1287                    false,
1288                ),
1289                semantic_type: SemanticType::Timestamp,
1290                column_id: 2,
1291            })
1292            .primary_key(vec![1, 1]);
1293        let err = builder.build().unwrap_err();
1294        assert!(
1295            err.to_string()
1296                .contains("duplicate column a in primary key"),
1297            "unexpected err: {err}",
1298        );
1299    }
1300
1301    #[test]
1302    fn test_in_time_index() {
1303        let mut builder = create_builder();
1304        builder
1305            .push_column_metadata(ColumnMetadata {
1306                column_schema: ColumnSchema::new(
1307                    "ts",
1308                    ConcreteDataType::timestamp_millisecond_datatype(),
1309                    false,
1310                ),
1311                semantic_type: SemanticType::Timestamp,
1312                column_id: 1,
1313            })
1314            .primary_key(vec![1]);
1315        let err = builder.build().unwrap_err();
1316        assert!(
1317            err.to_string()
1318                .contains("column ts is already a time index column"),
1319            "unexpected err: {err}",
1320        );
1321    }
1322
1323    #[test]
1324    fn test_nullable_time_index() {
1325        let mut builder = create_builder();
1326        builder.push_column_metadata(ColumnMetadata {
1327            column_schema: ColumnSchema::new(
1328                "ts",
1329                ConcreteDataType::timestamp_millisecond_datatype(),
1330                true,
1331            ),
1332            semantic_type: SemanticType::Timestamp,
1333            column_id: 1,
1334        });
1335        let err = builder.build().unwrap_err();
1336        assert!(
1337            err.to_string()
1338                .contains("time index column ts must be NOT NULL"),
1339            "unexpected err: {err}",
1340        );
1341    }
1342
1343    #[test]
1344    fn test_primary_key_semantic_type() {
1345        let mut builder = create_builder();
1346        builder
1347            .push_column_metadata(ColumnMetadata {
1348                column_schema: ColumnSchema::new(
1349                    "ts",
1350                    ConcreteDataType::timestamp_millisecond_datatype(),
1351                    false,
1352                ),
1353                semantic_type: SemanticType::Timestamp,
1354                column_id: 1,
1355            })
1356            .push_column_metadata(ColumnMetadata {
1357                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1358                semantic_type: SemanticType::Field,
1359                column_id: 2,
1360            })
1361            .primary_key(vec![2]);
1362        let err = builder.build().unwrap_err();
1363        assert!(
1364            err.to_string()
1365                .contains("semantic type of column a should be Tag, not Field"),
1366            "unexpected err: {err}",
1367        );
1368    }
1369
1370    #[test]
1371    fn test_primary_key_tag_num() {
1372        let mut builder = create_builder();
1373        builder
1374            .push_column_metadata(ColumnMetadata {
1375                column_schema: ColumnSchema::new(
1376                    "ts",
1377                    ConcreteDataType::timestamp_millisecond_datatype(),
1378                    false,
1379                ),
1380                semantic_type: SemanticType::Timestamp,
1381                column_id: 1,
1382            })
1383            .push_column_metadata(ColumnMetadata {
1384                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1385                semantic_type: SemanticType::Tag,
1386                column_id: 2,
1387            })
1388            .push_column_metadata(ColumnMetadata {
1389                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1390                semantic_type: SemanticType::Tag,
1391                column_id: 3,
1392            })
1393            .primary_key(vec![2]);
1394        let err = builder.build().unwrap_err();
1395        assert!(
1396            err.to_string()
1397                .contains("number of primary key columns 1 not equal to tag columns 2"),
1398            "unexpected err: {err}",
1399        );
1400    }
1401
1402    #[test]
1403    fn test_bump_version() {
1404        let mut region_metadata = build_test_region_metadata();
1405        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1406        builder.bump_version();
1407        let new_meta = builder.build().unwrap();
1408        region_metadata.schema_version += 1;
1409        assert_eq!(region_metadata, new_meta);
1410    }
1411
1412    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1413        let semantic_type = if is_tag {
1414            SemanticType::Tag
1415        } else {
1416            SemanticType::Field
1417        };
1418        ColumnMetadata {
1419            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1420            semantic_type,
1421            column_id,
1422        }
1423    }
1424
1425    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1426        let actual: Vec<_> = metadata
1427            .column_metadatas
1428            .iter()
1429            .map(|col| &col.column_schema.name)
1430            .collect();
1431        assert_eq!(names, actual);
1432    }
1433
1434    #[test]
1435    fn test_alter() {
1436        // a (tag), b (field), c (ts)
1437        let metadata = build_test_region_metadata();
1438        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1439        // tag d
1440        builder
1441            .alter(AlterKind::AddColumns {
1442                columns: vec![AddColumn {
1443                    column_metadata: new_column_metadata("d", true, 4),
1444                    location: None,
1445                }],
1446            })
1447            .unwrap();
1448        let metadata = builder.build().unwrap();
1449        check_columns(&metadata, &["a", "b", "c", "d"]);
1450        assert_eq!([1, 4], &metadata.primary_key[..]);
1451
1452        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1453        builder
1454            .alter(AlterKind::AddColumns {
1455                columns: vec![AddColumn {
1456                    column_metadata: new_column_metadata("e", false, 5),
1457                    location: Some(AddColumnLocation::First),
1458                }],
1459            })
1460            .unwrap();
1461        let metadata = builder.build().unwrap();
1462        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1463
1464        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1465        builder
1466            .alter(AlterKind::AddColumns {
1467                columns: vec![AddColumn {
1468                    column_metadata: new_column_metadata("f", false, 6),
1469                    location: Some(AddColumnLocation::After {
1470                        column_name: "b".to_string(),
1471                    }),
1472                }],
1473            })
1474            .unwrap();
1475        let metadata = builder.build().unwrap();
1476        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1477
1478        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1479        builder
1480            .alter(AlterKind::AddColumns {
1481                columns: vec![AddColumn {
1482                    column_metadata: new_column_metadata("g", false, 7),
1483                    location: Some(AddColumnLocation::After {
1484                        column_name: "d".to_string(),
1485                    }),
1486                }],
1487            })
1488            .unwrap();
1489        let metadata = builder.build().unwrap();
1490        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1491
1492        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1493        builder
1494            .alter(AlterKind::DropColumns {
1495                names: vec!["g".to_string(), "e".to_string()],
1496            })
1497            .unwrap();
1498        let metadata = builder.build().unwrap();
1499        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1500
1501        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1502        builder
1503            .alter(AlterKind::DropColumns {
1504                names: vec!["a".to_string()],
1505            })
1506            .unwrap();
1507        // Build returns error as the primary key contains a.
1508        let err = builder.build().unwrap_err();
1509        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1510
1511        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1512        builder
1513            .alter(AlterKind::ModifyColumnTypes {
1514                columns: vec![ModifyColumnType {
1515                    column_name: "b".to_string(),
1516                    target_type: ConcreteDataType::string_datatype(),
1517                }],
1518            })
1519            .unwrap();
1520        let metadata = builder.build().unwrap();
1521        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1522        let b_type = &metadata
1523            .column_by_name("b")
1524            .unwrap()
1525            .column_schema
1526            .data_type;
1527        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1528
1529        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1530        builder
1531            .alter(AlterKind::SetIndex {
1532                options: ApiSetIndexOptions::Fulltext {
1533                    column_name: "b".to_string(),
1534                    options: FulltextOptions {
1535                        enable: true,
1536                        analyzer: FulltextAnalyzer::Chinese,
1537                        case_sensitive: true,
1538                        backend: FulltextBackend::Bloom,
1539                    },
1540                },
1541            })
1542            .unwrap();
1543        let metadata = builder.build().unwrap();
1544        let a_fulltext_options = metadata
1545            .column_by_name("b")
1546            .unwrap()
1547            .column_schema
1548            .fulltext_options()
1549            .unwrap()
1550            .unwrap();
1551        assert!(a_fulltext_options.enable);
1552        assert_eq!(
1553            datatypes::schema::FulltextAnalyzer::Chinese,
1554            a_fulltext_options.analyzer
1555        );
1556        assert!(a_fulltext_options.case_sensitive);
1557
1558        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1559        builder
1560            .alter(AlterKind::UnsetIndex {
1561                options: ApiUnsetIndexOptions::Fulltext {
1562                    column_name: "b".to_string(),
1563                },
1564            })
1565            .unwrap();
1566        let metadata = builder.build().unwrap();
1567        let a_fulltext_options = metadata
1568            .column_by_name("b")
1569            .unwrap()
1570            .column_schema
1571            .fulltext_options()
1572            .unwrap()
1573            .unwrap();
1574        assert!(!a_fulltext_options.enable);
1575        assert_eq!(
1576            datatypes::schema::FulltextAnalyzer::Chinese,
1577            a_fulltext_options.analyzer
1578        );
1579        assert!(a_fulltext_options.case_sensitive);
1580    }
1581
1582    #[test]
1583    fn test_add_if_not_exists() {
1584        // a (tag), b (field), c (ts)
1585        let metadata = build_test_region_metadata();
1586        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1587        // tag d
1588        builder
1589            .alter(AlterKind::AddColumns {
1590                columns: vec![
1591                    AddColumn {
1592                        column_metadata: new_column_metadata("d", true, 4),
1593                        location: None,
1594                    },
1595                    AddColumn {
1596                        column_metadata: new_column_metadata("d", true, 4),
1597                        location: None,
1598                    },
1599                ],
1600            })
1601            .unwrap();
1602        let metadata = builder.build().unwrap();
1603        check_columns(&metadata, &["a", "b", "c", "d"]);
1604        assert_eq!([1, 4], &metadata.primary_key[..]);
1605
1606        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1607        // field b.
1608        builder
1609            .alter(AlterKind::AddColumns {
1610                columns: vec![AddColumn {
1611                    column_metadata: new_column_metadata("b", false, 2),
1612                    location: None,
1613                }],
1614            })
1615            .unwrap();
1616        let metadata = builder.build().unwrap();
1617        check_columns(&metadata, &["a", "b", "c", "d"]);
1618    }
1619
1620    #[test]
1621    fn test_add_column_with_inverted_index() {
1622        // only set inverted index to true explicitly will this column be inverted indexed
1623
1624        // a (tag), b (field), c (ts)
1625        let metadata = build_test_region_metadata();
1626        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1627        // tag d, e
1628        let mut col = new_column_metadata("d", true, 4);
1629        col.column_schema.set_inverted_index(true);
1630        builder
1631            .alter(AlterKind::AddColumns {
1632                columns: vec![
1633                    AddColumn {
1634                        column_metadata: col,
1635                        location: None,
1636                    },
1637                    AddColumn {
1638                        column_metadata: new_column_metadata("e", true, 5),
1639                        location: None,
1640                    },
1641                ],
1642            })
1643            .unwrap();
1644        let metadata = builder.build().unwrap();
1645        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1646        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1647        let column_metadata = metadata.column_by_name("a").unwrap();
1648        assert!(!column_metadata.column_schema.is_inverted_indexed());
1649        let column_metadata = metadata.column_by_name("b").unwrap();
1650        assert!(!column_metadata.column_schema.is_inverted_indexed());
1651        let column_metadata = metadata.column_by_name("c").unwrap();
1652        assert!(!column_metadata.column_schema.is_inverted_indexed());
1653        let column_metadata = metadata.column_by_name("d").unwrap();
1654        assert!(column_metadata.column_schema.is_inverted_indexed());
1655        let column_metadata = metadata.column_by_name("e").unwrap();
1656        assert!(!column_metadata.column_schema.is_inverted_indexed());
1657    }
1658
1659    #[test]
1660    fn test_drop_if_exists() {
1661        // a (tag), b (field), c (ts)
1662        let metadata = build_test_region_metadata();
1663        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1664        // field d, e
1665        builder
1666            .alter(AlterKind::AddColumns {
1667                columns: vec![
1668                    AddColumn {
1669                        column_metadata: new_column_metadata("d", false, 4),
1670                        location: None,
1671                    },
1672                    AddColumn {
1673                        column_metadata: new_column_metadata("e", false, 5),
1674                        location: None,
1675                    },
1676                ],
1677            })
1678            .unwrap();
1679        let metadata = builder.build().unwrap();
1680        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1681
1682        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1683        builder
1684            .alter(AlterKind::DropColumns {
1685                names: vec!["b".to_string(), "b".to_string()],
1686            })
1687            .unwrap();
1688        let metadata = builder.build().unwrap();
1689        check_columns(&metadata, &["a", "c", "d", "e"]);
1690
1691        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1692        builder
1693            .alter(AlterKind::DropColumns {
1694                names: vec!["b".to_string(), "e".to_string()],
1695            })
1696            .unwrap();
1697        let metadata = builder.build().unwrap();
1698        check_columns(&metadata, &["a", "c", "d"]);
1699    }
1700
1701    #[test]
1702    fn test_invalid_column_name() {
1703        let mut builder = create_builder();
1704        builder.push_column_metadata(ColumnMetadata {
1705            column_schema: ColumnSchema::new(
1706                "__sequence",
1707                ConcreteDataType::timestamp_millisecond_datatype(),
1708                false,
1709            ),
1710            semantic_type: SemanticType::Timestamp,
1711            column_id: 1,
1712        });
1713        let err = builder.build().unwrap_err();
1714        assert!(
1715            err.to_string()
1716                .contains("internal column name that can not be used"),
1717            "unexpected err: {err}",
1718        );
1719    }
1720
1721    #[test]
1722    fn test_debug_for_column_metadata() {
1723        let region_metadata = build_test_region_metadata();
1724        let formatted = format!("{:?}", region_metadata);
1725        assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1726    }
1727
1728    #[test]
1729    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1730        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1731        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1732        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1733
1734        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1735        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1736        assert_eq!(
1737            deserialized.primary_key_encoding,
1738            PrimaryKeyEncoding::Sparse
1739        );
1740    }
1741}