store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_base::hash::partition_expr_version;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_macro::stack_trace_debug;
31use datatypes::arrow;
32use datatypes::arrow::datatypes::FieldRef;
33use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
34use datatypes::types::TimestampType;
35use itertools::Itertools;
36use serde::de::Error;
37use serde::{Deserialize, Deserializer, Serialize};
38use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
39
40use crate::codec::PrimaryKeyEncoding;
41use crate::region_request::{
42    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
43};
44use crate::storage::consts::is_internal_column;
45use crate::storage::{ColumnId, RegionId};
46
47pub type Result<T> = std::result::Result<T, MetadataError>;
48
49/// Metadata of a column.
50#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
51pub struct ColumnMetadata {
52    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
53    pub column_schema: ColumnSchema,
54    /// Semantic type of this column (e.g. tag or timestamp).
55    pub semantic_type: SemanticType,
56    /// Immutable and unique id of a region.
57    pub column_id: ColumnId,
58}
59
60impl fmt::Debug for ColumnMetadata {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(
63            f,
64            "[{:?} {:?} {:?}]",
65            self.column_schema, self.semantic_type, self.column_id,
66        )
67    }
68}
69
70impl ColumnMetadata {
71    /// Construct `Self` from protobuf struct [RegionColumnDef]
72    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
73        let column_id = column_def.column_id;
74        let column_def = column_def
75            .column_def
76            .context(InvalidRawRegionRequestSnafu {
77                err: "column_def is absent",
78            })?;
79        let semantic_type = column_def.semantic_type();
80        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
81
82        Ok(Self {
83            column_schema,
84            semantic_type,
85            column_id,
86        })
87    }
88
89    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
90    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
91        serde_json::to_vec(columns)
92    }
93
94    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
95    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
96        serde_json::from_slice(bytes)
97    }
98
99    pub fn is_same_datatype(&self, other: &Self) -> bool {
100        self.column_schema.data_type == other.column_schema.data_type
101    }
102}
103
104#[cfg_attr(doc, aquamarine::aquamarine)]
105/// General static metadata of a region.
106///
107/// This struct implements [Serialize] and [Deserialize] traits.
108/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
109///
110/// ```mermaid
111/// class RegionMetadata {
112///     +RegionId region_id
113///     +SchemaRef schema
114///     +Vec&lt;ColumnMetadata&gt; column_metadatas
115///     +Vec&lt;ColumnId&gt; primary_key
116/// }
117/// class Schema
118/// class ColumnMetadata {
119///     +ColumnSchema column_schema
120///     +SemanticTyle semantic_type
121///     +ColumnId column_id
122/// }
123/// class SemanticType
124/// RegionMetadata o-- Schema
125/// RegionMetadata o-- ColumnMetadata
126/// ColumnMetadata o-- SemanticType
127/// ```
128#[derive(Clone, PartialEq, Eq, Serialize)]
129pub struct RegionMetadata {
130    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
131    #[serde(skip)]
132    pub schema: SchemaRef,
133
134    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
135    // so we can assumes they are valid.
136    /// Id of the time index column.
137    #[serde(skip)]
138    time_index: ColumnId,
139    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
140    #[serde(skip)]
141    id_to_index: HashMap<ColumnId, usize>,
142
143    /// Columns in the region. Has the same order as columns
144    /// in [schema](RegionMetadata::schema).
145    pub column_metadatas: Vec<ColumnMetadata>,
146    /// Maintains an ordered list of primary keys
147    pub primary_key: Vec<ColumnId>,
148
149    /// Immutable and unique id of a region.
150    pub region_id: RegionId,
151    /// Current version of the region schema.
152    ///
153    /// The version starts from 0. Altering the schema bumps the version.
154    pub schema_version: u64,
155
156    /// Primary key encoding mode.
157    pub primary_key_encoding: PrimaryKeyEncoding,
158
159    /// Partition expression serialized as a JSON string.
160    /// Compatibility behavior:
161    /// - None: no partition expr was ever set in the manifest (legacy regions).
162    /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
163    pub partition_expr: Option<String>,
164    #[serde(skip)]
165    pub partition_expr_version: u64,
166}
167
168impl fmt::Debug for RegionMetadata {
169    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
170        f.debug_struct("RegionMetadata")
171            .field("column_metadatas", &self.column_metadatas)
172            .field("time_index", &self.time_index)
173            .field("primary_key", &self.primary_key)
174            .field("region_id", &self.region_id)
175            .field("schema_version", &self.schema_version)
176            .field("partition_expr", &self.partition_expr)
177            .finish()
178    }
179}
180
181pub type RegionMetadataRef = Arc<RegionMetadata>;
182
183impl<'de> Deserialize<'de> for RegionMetadata {
184    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
185    where
186        D: Deserializer<'de>,
187    {
188        // helper internal struct for deserialization
189        #[derive(Deserialize)]
190        struct RegionMetadataWithoutSchema {
191            column_metadatas: Vec<ColumnMetadata>,
192            primary_key: Vec<ColumnId>,
193            region_id: RegionId,
194            schema_version: u64,
195            #[serde(default)]
196            primary_key_encoding: PrimaryKeyEncoding,
197            #[serde(default)]
198            partition_expr: Option<String>,
199        }
200
201        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
202        let skipped =
203            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
204
205        let partition_expr_version =
206            partition_expr_version(without_schema.partition_expr.as_deref());
207
208        Ok(Self {
209            schema: skipped.schema,
210            time_index: skipped.time_index,
211            id_to_index: skipped.id_to_index,
212            column_metadatas: without_schema.column_metadatas,
213            primary_key: without_schema.primary_key,
214            region_id: without_schema.region_id,
215            schema_version: without_schema.schema_version,
216            primary_key_encoding: without_schema.primary_key_encoding,
217            partition_expr: without_schema.partition_expr,
218            partition_expr_version,
219        })
220    }
221}
222
223impl RegionMetadata {
224    /// Decode the metadata from a JSON str.
225    pub fn from_json(s: &str) -> Result<Self> {
226        serde_json::from_str(s).context(SerdeJsonSnafu)
227    }
228
229    /// Encode the metadata to a JSON string.
230    pub fn to_json(&self) -> Result<String> {
231        serde_json::to_string(&self).context(SerdeJsonSnafu)
232    }
233
234    pub fn set_partition_expr(&mut self, expr: Option<String>) {
235        self.partition_expr_version = partition_expr_version(expr.as_deref());
236        self.partition_expr = expr;
237    }
238
239    /// Find column by id.
240    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
241        self.id_to_index
242            .get(&column_id)
243            .map(|index| &self.column_metadatas[*index])
244    }
245
246    /// Find column index by id.
247    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
248        self.id_to_index.get(&column_id).copied()
249    }
250
251    /// Find column index by name.
252    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
253        self.column_metadatas
254            .iter()
255            .position(|col| col.column_schema.name == column_name)
256    }
257
258    /// Returns the time index column
259    ///
260    /// # Panics
261    /// Panics if the time index column id is invalid.
262    pub fn time_index_column(&self) -> &ColumnMetadata {
263        let index = self.id_to_index[&self.time_index];
264        &self.column_metadatas[index]
265    }
266
267    /// Returns timestamp type of time index column
268    ///
269    /// # Panics
270    /// Panics if the time index column id is invalid.
271    pub fn time_index_type(&self) -> TimestampType {
272        let index = self.id_to_index[&self.time_index];
273        self.column_metadatas[index]
274            .column_schema
275            .data_type
276            .as_timestamp()
277            .unwrap()
278    }
279
280    /// Returns the position of the time index.
281    pub fn time_index_column_pos(&self) -> usize {
282        self.id_to_index[&self.time_index]
283    }
284
285    /// Returns the arrow field of the time index column.
286    pub fn time_index_field(&self) -> FieldRef {
287        let index = self.id_to_index[&self.time_index];
288        self.schema.arrow_schema().fields[index].clone()
289    }
290
291    /// Finds a column by name.
292    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
293        self.schema
294            .column_index_by_name(name)
295            .map(|index| &self.column_metadatas[index])
296    }
297
298    /// Returns all primary key columns.
299    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
300        // safety: RegionMetadata::validate ensures every primary key exists.
301        self.primary_key
302            .iter()
303            .map(|id| self.column_by_id(*id).unwrap())
304    }
305
306    /// Returns all field columns before projection.
307    ///
308    /// **Use with caution**. On read path where might have projection, this method
309    /// can return columns that not present in data batch.
310    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
311        self.column_metadatas
312            .iter()
313            .filter(|column| column.semantic_type == SemanticType::Field)
314    }
315
316    /// Returns a column's index in primary key if it is a primary key column.
317    ///
318    /// This does a linear search.
319    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
320        self.primary_key.iter().position(|id| *id == column_id)
321    }
322
323    /// Project the metadata to a new one using specified column ids.
324    ///
325    /// [RegionId] and schema version are preserved.
326    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
327        // check time index
328        ensure!(
329            projection.contains(&self.time_index),
330            TimeIndexNotFoundSnafu
331        );
332
333        // prepare new indices
334        let indices_to_preserve = projection
335            .iter()
336            .map(|id| {
337                self.column_index_by_id(*id)
338                    .with_context(|| InvalidRegionRequestSnafu {
339                        region_id: self.region_id,
340                        err: format!("column id {} not found", id),
341                    })
342            })
343            .collect::<Result<Vec<_>>>()?;
344
345        // project schema
346        let projected_schema =
347            self.schema
348                .try_project(&indices_to_preserve)
349                .with_context(|_| SchemaProjectSnafu {
350                    origin_schema: self.schema.clone(),
351                    projection: projection.to_vec(),
352                })?;
353
354        // project columns, generate projected primary key and new id_to_index
355        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
356        let mut projected_primary_key = vec![];
357        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
358        for index in indices_to_preserve {
359            let col = self.column_metadatas[index].clone();
360            if col.semantic_type == SemanticType::Tag {
361                projected_primary_key.push(col.column_id);
362            }
363            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
364            projected_column_metadatas.push(col);
365        }
366
367        Ok(RegionMetadata {
368            schema: Arc::new(projected_schema),
369            time_index: self.time_index,
370            id_to_index: projected_id_to_index,
371            column_metadatas: projected_column_metadatas,
372            primary_key: projected_primary_key,
373            region_id: self.region_id,
374            schema_version: self.schema_version,
375            primary_key_encoding: self.primary_key_encoding,
376            partition_expr: self.partition_expr.clone(),
377            partition_expr_version: partition_expr_version(self.partition_expr.as_deref()),
378        })
379    }
380
381    /// Gets the column ids to be indexed by inverted index.
382    pub fn inverted_indexed_column_ids<'a>(
383        &self,
384        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
385    ) -> HashSet<ColumnId> {
386        let mut inverted_index = self
387            .column_metadatas
388            .iter()
389            .filter(|column| column.column_schema.is_inverted_indexed())
390            .map(|column| column.column_id)
391            .collect::<HashSet<_>>();
392
393        for ignored in ignore_column_ids {
394            inverted_index.remove(ignored);
395        }
396
397        inverted_index
398    }
399
400    /// Gets the column IDs that have vector indexes along with their options.
401    /// Returns a map from column ID to the vector index options.
402    pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
403        self.column_metadatas
404            .iter()
405            .filter_map(|column| {
406                column
407                    .column_schema
408                    .vector_index_options()
409                    .ok()
410                    .flatten()
411                    .map(|options| (column.column_id, options))
412            })
413            .collect()
414    }
415
416    /// Checks whether the metadata is valid.
417    fn validate(&self) -> Result<()> {
418        // Id to name.
419        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
420        for col in &self.column_metadatas {
421            // Validate each column.
422            Self::validate_column_metadata(col)?;
423
424            // Check whether column id is duplicated. We already check column name
425            // is unique in `Schema` so we only check column id here.
426            ensure!(
427                !id_names.contains_key(&col.column_id),
428                InvalidMetaSnafu {
429                    reason: format!(
430                        "column {} and {} have the same column id {}",
431                        id_names[&col.column_id], col.column_schema.name, col.column_id,
432                    ),
433                }
434            );
435            id_names.insert(col.column_id, &col.column_schema.name);
436        }
437
438        // Checks there is only one time index.
439        let time_indexes = self
440            .column_metadatas
441            .iter()
442            .filter(|col| col.semantic_type == SemanticType::Timestamp)
443            .collect::<Vec<_>>();
444        ensure!(
445            time_indexes.len() == 1,
446            InvalidMetaSnafu {
447                reason: format!(
448                    "expect only one time index, found {}: {}",
449                    time_indexes.len(),
450                    time_indexes
451                        .iter()
452                        .map(|c| &c.column_schema.name)
453                        .join(", ")
454                ),
455            }
456        );
457
458        // Checks the time index column is not nullable.
459        ensure!(
460            !self.time_index_column().column_schema.is_nullable(),
461            InvalidMetaSnafu {
462                reason: format!(
463                    "time index column {} must be NOT NULL",
464                    self.time_index_column().column_schema.name
465                ),
466            }
467        );
468
469        if !self.primary_key.is_empty() {
470            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
471            // Checks column ids in the primary key is valid.
472            for column_id in &self.primary_key {
473                // Checks whether the column id exists.
474                ensure!(
475                    id_names.contains_key(column_id),
476                    InvalidMetaSnafu {
477                        reason: format!("unknown column id {}", column_id),
478                    }
479                );
480
481                // Safety: Column with specific id must exist.
482                let column = self.column_by_id(*column_id).unwrap();
483                // Checks duplicate.
484                ensure!(
485                    !pk_ids.contains(&column_id),
486                    InvalidMetaSnafu {
487                        reason: format!(
488                            "duplicate column {} in primary key",
489                            column.column_schema.name
490                        ),
491                    }
492                );
493
494                // Checks this is not a time index column.
495                ensure!(
496                    *column_id != self.time_index,
497                    InvalidMetaSnafu {
498                        reason: format!(
499                            "column {} is already a time index column",
500                            column.column_schema.name,
501                        ),
502                    }
503                );
504
505                // Checks semantic type.
506                ensure!(
507                    column.semantic_type == SemanticType::Tag,
508                    InvalidMetaSnafu {
509                        reason: format!(
510                            "semantic type of column {} should be Tag, not {:?}",
511                            column.column_schema.name, column.semantic_type
512                        ),
513                    }
514                );
515
516                pk_ids.insert(column_id);
517            }
518        }
519
520        // Checks tag semantic type.
521        let num_tag = self
522            .column_metadatas
523            .iter()
524            .filter(|col| col.semantic_type == SemanticType::Tag)
525            .count();
526        ensure!(
527            num_tag == self.primary_key.len(),
528            InvalidMetaSnafu {
529                reason: format!(
530                    "number of primary key columns {} not equal to tag columns {}",
531                    self.primary_key.len(),
532                    num_tag
533                ),
534            }
535        );
536
537        Ok(())
538    }
539
540    /// Checks whether it is a valid column.
541    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
542        if column_metadata.semantic_type == SemanticType::Timestamp {
543            ensure!(
544                column_metadata.column_schema.data_type.is_timestamp(),
545                InvalidMetaSnafu {
546                    reason: format!(
547                        "column `{}` is not timestamp type",
548                        column_metadata.column_schema.name
549                    ),
550                }
551            );
552        }
553
554        ensure!(
555            !is_internal_column(&column_metadata.column_schema.name),
556            InvalidMetaSnafu {
557                reason: format!(
558                    "{} is internal column name that can not be used",
559                    column_metadata.column_schema.name
560                ),
561            }
562        );
563
564        Ok(())
565    }
566}
567
568/// Builder to build [RegionMetadata].
569pub struct RegionMetadataBuilder {
570    region_id: RegionId,
571    column_metadatas: Vec<ColumnMetadata>,
572    primary_key: Vec<ColumnId>,
573    schema_version: u64,
574    primary_key_encoding: PrimaryKeyEncoding,
575    partition_expr: Option<String>,
576}
577
578impl RegionMetadataBuilder {
579    /// Returns a new builder.
580    pub fn new(id: RegionId) -> Self {
581        Self {
582            region_id: id,
583            column_metadatas: vec![],
584            primary_key: vec![],
585            schema_version: 0,
586            primary_key_encoding: PrimaryKeyEncoding::Dense,
587            partition_expr: None,
588        }
589    }
590
591    /// Creates a builder from existing [RegionMetadata].
592    pub fn from_existing(existing: RegionMetadata) -> Self {
593        Self {
594            column_metadatas: existing.column_metadatas,
595            primary_key: existing.primary_key,
596            region_id: existing.region_id,
597            schema_version: existing.schema_version,
598            primary_key_encoding: existing.primary_key_encoding,
599            partition_expr: existing.partition_expr,
600        }
601    }
602
603    /// Sets the primary key encoding mode.
604    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
605        self.primary_key_encoding = encoding;
606        self
607    }
608
609    /// Sets the partition expression in JSON string form.
610    pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
611        self.partition_expr = expr_json;
612        self
613    }
614
615    /// Pushes a new column metadata to this region's metadata.
616    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
617        self.column_metadatas.push(column_metadata);
618        self
619    }
620
621    /// Sets the primary key of the region.
622    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
623        self.primary_key = key;
624        self
625    }
626
627    /// Increases the schema version by 1.
628    pub fn bump_version(&mut self) -> &mut Self {
629        self.schema_version += 1;
630        self
631    }
632
633    /// Applies the alter `kind` to the builder.
634    ///
635    /// The `kind` should be valid.
636    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
637        match kind {
638            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
639            AlterKind::DropColumns { names } => self.drop_columns(&names),
640            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
641            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
642            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
643            AlterKind::SetRegionOptions { options: _ } => {
644                // nothing to be done with RegionMetadata
645            }
646            AlterKind::UnsetRegionOptions { keys: _ } => {
647                // nothing to be done with RegionMetadata
648            }
649            AlterKind::DropDefaults { names } => {
650                self.drop_defaults(names)?;
651            }
652            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
653            AlterKind::SyncColumns { column_metadatas } => {
654                self.primary_key = column_metadatas
655                    .iter()
656                    .filter_map(|column_metadata| {
657                        if column_metadata.semantic_type == SemanticType::Tag {
658                            Some(column_metadata.column_id)
659                        } else {
660                            None
661                        }
662                    })
663                    .collect::<Vec<_>>();
664                self.column_metadatas = column_metadatas;
665            }
666        }
667        Ok(self)
668    }
669
670    /// Consumes the builder and build a [RegionMetadata].
671    pub fn build(self) -> Result<RegionMetadata> {
672        self.build_with_options(true)
673    }
674
675    /// Builds metadata without running validation.
676    ///
677    /// Intended for file/external engines that should accept arbitrary schemas
678    /// coming from files.
679    pub fn build_without_validation(self) -> Result<RegionMetadata> {
680        self.build_with_options(false)
681    }
682
683    fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
684        let skipped = SkippedFields::new(&self.column_metadatas)?;
685
686        let partition_expr_version = partition_expr_version(self.partition_expr.as_deref());
687        let meta = RegionMetadata {
688            schema: skipped.schema,
689            time_index: skipped.time_index,
690            id_to_index: skipped.id_to_index,
691            column_metadatas: self.column_metadatas,
692            primary_key: self.primary_key,
693            region_id: self.region_id,
694            schema_version: self.schema_version,
695            primary_key_encoding: self.primary_key_encoding,
696            partition_expr: self.partition_expr,
697            partition_expr_version,
698        };
699
700        if validate {
701            meta.validate()?;
702        }
703
704        Ok(meta)
705    }
706
707    /// Adds columns to the metadata if not exist.
708    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
709        let mut names: HashSet<_> = self
710            .column_metadatas
711            .iter()
712            .map(|col| col.column_schema.name.clone())
713            .collect();
714
715        for add_column in columns {
716            if names.contains(&add_column.column_metadata.column_schema.name) {
717                // Column already exists.
718                continue;
719            }
720
721            let column_id = add_column.column_metadata.column_id;
722            let semantic_type = add_column.column_metadata.semantic_type;
723            let column_name = add_column.column_metadata.column_schema.name.clone();
724            match add_column.location {
725                None => {
726                    self.column_metadatas.push(add_column.column_metadata);
727                }
728                Some(AddColumnLocation::First) => {
729                    self.column_metadatas.insert(0, add_column.column_metadata);
730                }
731                Some(AddColumnLocation::After { column_name }) => {
732                    let pos = self
733                        .column_metadatas
734                        .iter()
735                        .position(|col| col.column_schema.name == column_name)
736                        .context(InvalidRegionRequestSnafu {
737                            region_id: self.region_id,
738                            err: format!(
739                                "column {} not found, failed to add column {} after it",
740                                column_name, add_column.column_metadata.column_schema.name
741                            ),
742                        })?;
743                    // Insert after pos.
744                    self.column_metadatas
745                        .insert(pos + 1, add_column.column_metadata);
746                }
747            }
748            names.insert(column_name);
749            if semantic_type == SemanticType::Tag {
750                // For a new tag, we extend the primary key.
751                self.primary_key.push(column_id);
752            }
753        }
754
755        Ok(())
756    }
757
758    /// Drops columns from the metadata if exist.
759    fn drop_columns(&mut self, names: &[String]) {
760        let name_set: HashSet<_> = names.iter().collect();
761        self.column_metadatas
762            .retain(|col| !name_set.contains(&col.column_schema.name));
763    }
764
765    /// Changes columns type to the metadata if exist.
766    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
767        let mut change_type_map: HashMap<_, _> = columns
768            .into_iter()
769            .map(
770                |ModifyColumnType {
771                     column_name,
772                     target_type,
773                 }| (column_name, target_type),
774            )
775            .collect();
776
777        for column_meta in self.column_metadatas.iter_mut() {
778            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
779                column_meta.column_schema.data_type = target_type.clone();
780                // also cast default value to target_type if default value exist
781                let new_default =
782                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
783                        Some(
784                            default_value
785                                .cast_to_datatype(&target_type)
786                                .with_context(|_| CastDefaultValueSnafu {
787                                    reason: format!(
788                                        "Failed to cast default value from {:?} to type {:?}",
789                                        default_value, target_type
790                                    ),
791                                })?,
792                        )
793                    } else {
794                        None
795                    };
796                column_meta.column_schema = column_meta
797                    .column_schema
798                    .clone()
799                    .with_default_constraint(new_default.clone())
800                    .with_context(|_| CastDefaultValueSnafu {
801                        reason: format!("Failed to set new default: {:?}", new_default),
802                    })?;
803            }
804        }
805
806        Ok(())
807    }
808
809    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
810        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
811        for option in &options {
812            set_index_map
813                .entry(option.column_name())
814                .or_default()
815                .push(option);
816        }
817
818        for column_metadata in self.column_metadatas.iter_mut() {
819            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
820                for option in options {
821                    Self::set_index(column_metadata, option)?;
822                }
823            }
824        }
825
826        Ok(())
827    }
828
829    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
830        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
831        for option in &options {
832            unset_index_map
833                .entry(option.column_name())
834                .or_default()
835                .push(option);
836        }
837
838        for column_metadata in self.column_metadatas.iter_mut() {
839            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
840                for option in options {
841                    Self::unset_index(column_metadata, option)?;
842                }
843            }
844        }
845
846        Ok(())
847    }
848
849    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
850        match options {
851            SetIndexOption::Fulltext {
852                column_name,
853                options,
854            } => {
855                ensure!(
856                    column_metadata.column_schema.data_type.is_string(),
857                    InvalidColumnOptionSnafu {
858                        column_name,
859                        msg: "FULLTEXT index only supports string type".to_string(),
860                    }
861                );
862                let current_fulltext_options = column_metadata
863                    .column_schema
864                    .fulltext_options()
865                    .with_context(|_| GetFulltextOptionsSnafu {
866                        column_name: column_name.clone(),
867                    })?;
868                set_column_fulltext_options(
869                    column_metadata,
870                    column_name,
871                    options,
872                    current_fulltext_options,
873                )?;
874            }
875            SetIndexOption::Inverted { .. } => {
876                column_metadata.column_schema.set_inverted_index(true)
877            }
878            SetIndexOption::Skipping {
879                column_name,
880                options,
881            } => {
882                column_metadata
883                    .column_schema
884                    .set_skipping_options(options)
885                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
886            }
887        }
888
889        Ok(())
890    }
891
892    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
893        match options {
894            UnsetIndexOption::Fulltext { column_name } => {
895                ensure!(
896                    column_metadata.column_schema.data_type.is_string(),
897                    InvalidColumnOptionSnafu {
898                        column_name,
899                        msg: "FULLTEXT index only supports string type".to_string(),
900                    }
901                );
902
903                let current_fulltext_options = column_metadata
904                    .column_schema
905                    .fulltext_options()
906                    .with_context(|_| GetFulltextOptionsSnafu {
907                        column_name: column_name.clone(),
908                    })?;
909
910                unset_column_fulltext_options(
911                    column_metadata,
912                    column_name,
913                    current_fulltext_options,
914                )?;
915            }
916            UnsetIndexOption::Inverted { .. } => {
917                column_metadata.column_schema.set_inverted_index(false)
918            }
919            UnsetIndexOption::Skipping { column_name } => {
920                column_metadata
921                    .column_schema
922                    .unset_skipping_options()
923                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
924            }
925        }
926
927        Ok(())
928    }
929
930    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
931        for name in column_names.iter() {
932            let meta = self
933                .column_metadatas
934                .iter_mut()
935                .find(|col| col.column_schema.name == *name);
936            if let Some(meta) = meta {
937                if !meta.column_schema.is_nullable() {
938                    return InvalidRegionRequestSnafu {
939                        region_id: self.region_id,
940                        err: format!(
941                            "column {name} is not nullable and `default` cannot be dropped",
942                        ),
943                    }
944                    .fail();
945                }
946                meta.column_schema = meta
947                    .column_schema
948                    .clone()
949                    .with_default_constraint(None)
950                    .with_context(|_| CastDefaultValueSnafu {
951                        reason: format!("Failed to drop default : {name:?}"),
952                    })?;
953            } else {
954                return InvalidRegionRequestSnafu {
955                    region_id: self.region_id,
956                    err: format!("column {name} not found",),
957                }
958                .fail();
959            }
960        }
961        Ok(())
962    }
963
964    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
965        for set_default in set_defaults.iter() {
966            let meta = self
967                .column_metadatas
968                .iter_mut()
969                .find(|col| col.column_schema.name == set_default.name);
970            if let Some(meta) = meta {
971                let default_constraint = common_sql::convert::deserialize_default_constraint(
972                    set_default.default_constraint.as_slice(),
973                    &meta.column_schema.name,
974                    &meta.column_schema.data_type,
975                )
976                .context(SqlCommonSnafu)?;
977
978                meta.column_schema = meta
979                    .column_schema
980                    .clone()
981                    .with_default_constraint(default_constraint)
982                    .with_context(|_| CastDefaultValueSnafu {
983                        reason: format!("Failed to set default : {set_default:?}"),
984                    })?;
985            } else {
986                return InvalidRegionRequestSnafu {
987                    region_id: self.region_id,
988                    err: format!("column {} not found", set_default.name),
989                }
990                .fail();
991            }
992        }
993        Ok(())
994    }
995}
996
997/// Fields skipped in serialization.
998struct SkippedFields {
999    /// Last schema.
1000    schema: SchemaRef,
1001    /// Id of the time index column.
1002    time_index: ColumnId,
1003    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
1004    id_to_index: HashMap<ColumnId, usize>,
1005}
1006
1007impl SkippedFields {
1008    /// Constructs skipped fields from `column_metadatas`.
1009    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
1010        let column_schemas = column_metadatas
1011            .iter()
1012            .map(|column_metadata| column_metadata.column_schema.clone())
1013            .collect();
1014        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1015        let time_index = column_metadatas
1016            .iter()
1017            .find_map(|col| {
1018                if col.semantic_type == SemanticType::Timestamp {
1019                    Some(col.column_id)
1020                } else {
1021                    None
1022                }
1023            })
1024            .context(InvalidMetaSnafu {
1025                reason: "time index not found",
1026            })?;
1027        let id_to_index = column_metadatas
1028            .iter()
1029            .enumerate()
1030            .map(|(idx, col)| (col.column_id, idx))
1031            .collect();
1032
1033        Ok(SkippedFields {
1034            schema,
1035            time_index,
1036            id_to_index,
1037        })
1038    }
1039}
1040
1041#[derive(Snafu)]
1042#[snafu(visibility(pub))]
1043#[stack_trace_debug]
1044pub enum MetadataError {
1045    #[snafu(display("Invalid schema"))]
1046    InvalidSchema {
1047        source: datatypes::error::Error,
1048        #[snafu(implicit)]
1049        location: Location,
1050    },
1051
1052    #[snafu(display("Invalid metadata, {}", reason))]
1053    InvalidMeta {
1054        reason: String,
1055        #[snafu(implicit)]
1056        location: Location,
1057    },
1058
1059    #[snafu(display("Failed to ser/de json object"))]
1060    SerdeJson {
1061        #[snafu(implicit)]
1062        location: Location,
1063        #[snafu(source)]
1064        error: serde_json::Error,
1065    },
1066
1067    #[snafu(display("Invalid raw region request, err: {}", err))]
1068    InvalidRawRegionRequest {
1069        err: String,
1070        #[snafu(implicit)]
1071        location: Location,
1072    },
1073
1074    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1075    InvalidRegionRequest {
1076        region_id: RegionId,
1077        err: String,
1078        #[snafu(implicit)]
1079        location: Location,
1080    },
1081
1082    #[snafu(display("Unexpected schema error during project"))]
1083    SchemaProject {
1084        origin_schema: SchemaRef,
1085        projection: Vec<ColumnId>,
1086        #[snafu(implicit)]
1087        location: Location,
1088        source: datatypes::Error,
1089    },
1090
1091    #[snafu(display("Time index column not found"))]
1092    TimeIndexNotFound {
1093        #[snafu(implicit)]
1094        location: Location,
1095    },
1096
1097    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1098    ChangeColumnNotFound {
1099        column_name: String,
1100        region_id: RegionId,
1101        #[snafu(implicit)]
1102        location: Location,
1103    },
1104
1105    #[snafu(display("Failed to convert column schema"))]
1106    ConvertColumnSchema {
1107        source: api::error::Error,
1108        #[snafu(implicit)]
1109        location: Location,
1110    },
1111
1112    #[snafu(display("Failed to convert TimeRanges"))]
1113    ConvertTimeRanges {
1114        source: api::error::Error,
1115        #[snafu(implicit)]
1116        location: Location,
1117    },
1118
1119    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1120    InvalidSetRegionOptionRequest {
1121        key: String,
1122        value: String,
1123        #[snafu(implicit)]
1124        location: Location,
1125    },
1126
1127    #[snafu(display("Invalid set region option request, key: {}", key))]
1128    InvalidUnsetRegionOptionRequest {
1129        key: String,
1130        #[snafu(implicit)]
1131        location: Location,
1132    },
1133
1134    #[snafu(display("Failed to decode protobuf"))]
1135    DecodeProto {
1136        #[snafu(source)]
1137        error: prost::UnknownEnumValue,
1138        #[snafu(implicit)]
1139        location: Location,
1140    },
1141
1142    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1143    InvalidColumnOption {
1144        column_name: String,
1145        msg: String,
1146        #[snafu(implicit)]
1147        location: Location,
1148    },
1149
1150    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1151    SetFulltextOptions {
1152        column_name: String,
1153        source: datatypes::Error,
1154        #[snafu(implicit)]
1155        location: Location,
1156    },
1157
1158    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1159    GetFulltextOptions {
1160        column_name: String,
1161        source: datatypes::Error,
1162        #[snafu(implicit)]
1163        location: Location,
1164    },
1165
1166    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1167    SetSkippingIndexOptions {
1168        column_name: String,
1169        source: datatypes::Error,
1170        #[snafu(implicit)]
1171        location: Location,
1172    },
1173
1174    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1175    UnsetSkippingIndexOptions {
1176        column_name: String,
1177        source: datatypes::Error,
1178        #[snafu(implicit)]
1179        location: Location,
1180    },
1181
1182    #[snafu(display("Failed to decode arrow ipc record batches"))]
1183    DecodeArrowIpc {
1184        #[snafu(source)]
1185        error: arrow::error::ArrowError,
1186        #[snafu(implicit)]
1187        location: Location,
1188    },
1189
1190    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1191    CastDefaultValue {
1192        reason: String,
1193        source: datatypes::Error,
1194        #[snafu(implicit)]
1195        location: Location,
1196    },
1197
1198    #[snafu(display("Unexpected: {}", reason))]
1199    Unexpected {
1200        reason: String,
1201        #[snafu(implicit)]
1202        location: Location,
1203    },
1204
1205    #[snafu(display("Failed to encode/decode flight message"))]
1206    FlightCodec {
1207        source: common_grpc::Error,
1208        #[snafu(implicit)]
1209        location: Location,
1210    },
1211
1212    #[snafu(display("Invalid index option"))]
1213    InvalidIndexOption {
1214        #[snafu(implicit)]
1215        location: Location,
1216        #[snafu(source)]
1217        error: datatypes::error::Error,
1218    },
1219
1220    #[snafu(display("Sql common error"))]
1221    SqlCommon {
1222        source: common_sql::error::Error,
1223        #[snafu(implicit)]
1224        location: Location,
1225    },
1226}
1227
1228impl ErrorExt for MetadataError {
1229    fn status_code(&self) -> StatusCode {
1230        match self {
1231            Self::SqlCommon { source, .. } => source.status_code(),
1232            _ => StatusCode::InvalidArguments,
1233        }
1234    }
1235
1236    fn as_any(&self) -> &dyn Any {
1237        self
1238    }
1239}
1240
1241/// Set column fulltext options if it passed the validation.
1242///
1243/// Options allowed to modify:
1244/// * backend
1245///
1246/// Options not allowed to modify:
1247/// * analyzer
1248/// * case_sensitive
1249fn set_column_fulltext_options(
1250    column_meta: &mut ColumnMetadata,
1251    column_name: &str,
1252    options: &FulltextOptions,
1253    current_options: Option<FulltextOptions>,
1254) -> Result<()> {
1255    if let Some(current_options) = current_options {
1256        ensure!(
1257            current_options.analyzer == options.analyzer
1258                && current_options.case_sensitive == options.case_sensitive,
1259            InvalidColumnOptionSnafu {
1260                column_name,
1261                msg: format!(
1262                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1263                    current_options.analyzer, current_options.case_sensitive
1264                ),
1265            }
1266        );
1267    }
1268
1269    column_meta
1270        .column_schema
1271        .set_fulltext_options(options)
1272        .context(SetFulltextOptionsSnafu { column_name })?;
1273
1274    Ok(())
1275}
1276
1277fn unset_column_fulltext_options(
1278    column_meta: &mut ColumnMetadata,
1279    column_name: &str,
1280    current_options: Option<FulltextOptions>,
1281) -> Result<()> {
1282    if let Some(mut current_options) = current_options
1283        && current_options.enable
1284    {
1285        current_options.enable = false;
1286        column_meta
1287            .column_schema
1288            .set_fulltext_options(&current_options)
1289            .context(SetFulltextOptionsSnafu { column_name })?;
1290    } else {
1291        return InvalidColumnOptionSnafu {
1292            column_name,
1293            msg: "FULLTEXT index already disabled",
1294        }
1295        .fail();
1296    }
1297
1298    Ok(())
1299}
1300
1301#[cfg(test)]
1302mod test {
1303    use datatypes::prelude::ConcreteDataType;
1304    use datatypes::schema::{
1305        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1306    };
1307    use datatypes::value::Value;
1308
1309    use super::*;
1310
1311    fn create_builder() -> RegionMetadataBuilder {
1312        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1313    }
1314
1315    fn build_test_region_metadata() -> RegionMetadata {
1316        let mut builder = create_builder();
1317        builder
1318            .push_column_metadata(ColumnMetadata {
1319                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1320                semantic_type: SemanticType::Tag,
1321                column_id: 1,
1322            })
1323            .push_column_metadata(ColumnMetadata {
1324                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1325                semantic_type: SemanticType::Field,
1326                column_id: 2,
1327            })
1328            .push_column_metadata(ColumnMetadata {
1329                column_schema: ColumnSchema::new(
1330                    "c",
1331                    ConcreteDataType::timestamp_millisecond_datatype(),
1332                    false,
1333                ),
1334                semantic_type: SemanticType::Timestamp,
1335                column_id: 3,
1336            })
1337            .primary_key(vec![1])
1338            .partition_expr_json(Some("".to_string()));
1339        builder.build().unwrap()
1340    }
1341
1342    #[test]
1343    fn test_region_metadata() {
1344        let region_metadata = build_test_region_metadata();
1345        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1346        assert_eq!(
1347            "a",
1348            region_metadata.column_by_id(1).unwrap().column_schema.name
1349        );
1350        assert_eq!(None, region_metadata.column_by_id(10));
1351    }
1352
1353    #[test]
1354    fn test_region_metadata_serde() {
1355        let region_metadata = build_test_region_metadata();
1356        let serialized = serde_json::to_string(&region_metadata).unwrap();
1357        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1358        assert_eq!(region_metadata, deserialized);
1359    }
1360
1361    #[test]
1362    fn test_column_metadata_validate() {
1363        let mut builder = create_builder();
1364        let col = ColumnMetadata {
1365            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1366            semantic_type: SemanticType::Timestamp,
1367            column_id: 1,
1368        };
1369
1370        builder.push_column_metadata(col);
1371        let err = builder.build().unwrap_err();
1372        assert!(
1373            err.to_string()
1374                .contains("column `ts` is not timestamp type"),
1375            "unexpected err: {err}",
1376        );
1377    }
1378
1379    #[test]
1380    fn test_empty_region_metadata() {
1381        let builder = create_builder();
1382        let err = builder.build().unwrap_err();
1383        // A region must have a time index.
1384        assert!(
1385            err.to_string().contains("time index not found"),
1386            "unexpected err: {err}",
1387        );
1388    }
1389
1390    #[test]
1391    fn test_same_column_id() {
1392        let mut builder = create_builder();
1393        builder
1394            .push_column_metadata(ColumnMetadata {
1395                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1396                semantic_type: SemanticType::Tag,
1397                column_id: 1,
1398            })
1399            .push_column_metadata(ColumnMetadata {
1400                column_schema: ColumnSchema::new(
1401                    "b",
1402                    ConcreteDataType::timestamp_millisecond_datatype(),
1403                    false,
1404                ),
1405                semantic_type: SemanticType::Timestamp,
1406                column_id: 1,
1407            });
1408        let err = builder.build().unwrap_err();
1409        assert!(
1410            err.to_string()
1411                .contains("column a and b have the same column id"),
1412            "unexpected err: {err}",
1413        );
1414    }
1415
1416    #[test]
1417    fn test_duplicate_time_index() {
1418        let mut builder = create_builder();
1419        builder
1420            .push_column_metadata(ColumnMetadata {
1421                column_schema: ColumnSchema::new(
1422                    "a",
1423                    ConcreteDataType::timestamp_millisecond_datatype(),
1424                    false,
1425                ),
1426                semantic_type: SemanticType::Timestamp,
1427                column_id: 1,
1428            })
1429            .push_column_metadata(ColumnMetadata {
1430                column_schema: ColumnSchema::new(
1431                    "b",
1432                    ConcreteDataType::timestamp_millisecond_datatype(),
1433                    false,
1434                ),
1435                semantic_type: SemanticType::Timestamp,
1436                column_id: 2,
1437            });
1438        let err = builder.build().unwrap_err();
1439        assert!(
1440            err.to_string().contains("expect only one time index"),
1441            "unexpected err: {err}",
1442        );
1443    }
1444
1445    #[test]
1446    fn test_unknown_primary_key() {
1447        let mut builder = create_builder();
1448        builder
1449            .push_column_metadata(ColumnMetadata {
1450                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1451                semantic_type: SemanticType::Tag,
1452                column_id: 1,
1453            })
1454            .push_column_metadata(ColumnMetadata {
1455                column_schema: ColumnSchema::new(
1456                    "b",
1457                    ConcreteDataType::timestamp_millisecond_datatype(),
1458                    false,
1459                ),
1460                semantic_type: SemanticType::Timestamp,
1461                column_id: 2,
1462            })
1463            .primary_key(vec![3]);
1464        let err = builder.build().unwrap_err();
1465        assert!(
1466            err.to_string().contains("unknown column id 3"),
1467            "unexpected err: {err}",
1468        );
1469    }
1470
1471    #[test]
1472    fn test_same_primary_key() {
1473        let mut builder = create_builder();
1474        builder
1475            .push_column_metadata(ColumnMetadata {
1476                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1477                semantic_type: SemanticType::Tag,
1478                column_id: 1,
1479            })
1480            .push_column_metadata(ColumnMetadata {
1481                column_schema: ColumnSchema::new(
1482                    "b",
1483                    ConcreteDataType::timestamp_millisecond_datatype(),
1484                    false,
1485                ),
1486                semantic_type: SemanticType::Timestamp,
1487                column_id: 2,
1488            })
1489            .primary_key(vec![1, 1]);
1490        let err = builder.build().unwrap_err();
1491        assert!(
1492            err.to_string()
1493                .contains("duplicate column a in primary key"),
1494            "unexpected err: {err}",
1495        );
1496    }
1497
1498    #[test]
1499    fn test_in_time_index() {
1500        let mut builder = create_builder();
1501        builder
1502            .push_column_metadata(ColumnMetadata {
1503                column_schema: ColumnSchema::new(
1504                    "ts",
1505                    ConcreteDataType::timestamp_millisecond_datatype(),
1506                    false,
1507                ),
1508                semantic_type: SemanticType::Timestamp,
1509                column_id: 1,
1510            })
1511            .primary_key(vec![1]);
1512        let err = builder.build().unwrap_err();
1513        assert!(
1514            err.to_string()
1515                .contains("column ts is already a time index column"),
1516            "unexpected err: {err}",
1517        );
1518    }
1519
1520    #[test]
1521    fn test_nullable_time_index() {
1522        let mut builder = create_builder();
1523        builder.push_column_metadata(ColumnMetadata {
1524            column_schema: ColumnSchema::new(
1525                "ts",
1526                ConcreteDataType::timestamp_millisecond_datatype(),
1527                true,
1528            ),
1529            semantic_type: SemanticType::Timestamp,
1530            column_id: 1,
1531        });
1532        let err = builder.build().unwrap_err();
1533        assert!(
1534            err.to_string()
1535                .contains("time index column ts must be NOT NULL"),
1536            "unexpected err: {err}",
1537        );
1538    }
1539
1540    #[test]
1541    fn test_primary_key_semantic_type() {
1542        let mut builder = create_builder();
1543        builder
1544            .push_column_metadata(ColumnMetadata {
1545                column_schema: ColumnSchema::new(
1546                    "ts",
1547                    ConcreteDataType::timestamp_millisecond_datatype(),
1548                    false,
1549                ),
1550                semantic_type: SemanticType::Timestamp,
1551                column_id: 1,
1552            })
1553            .push_column_metadata(ColumnMetadata {
1554                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1555                semantic_type: SemanticType::Field,
1556                column_id: 2,
1557            })
1558            .primary_key(vec![2]);
1559        let err = builder.build().unwrap_err();
1560        assert!(
1561            err.to_string()
1562                .contains("semantic type of column a should be Tag, not Field"),
1563            "unexpected err: {err}",
1564        );
1565    }
1566
1567    #[test]
1568    fn test_primary_key_tag_num() {
1569        let mut builder = create_builder();
1570        builder
1571            .push_column_metadata(ColumnMetadata {
1572                column_schema: ColumnSchema::new(
1573                    "ts",
1574                    ConcreteDataType::timestamp_millisecond_datatype(),
1575                    false,
1576                ),
1577                semantic_type: SemanticType::Timestamp,
1578                column_id: 1,
1579            })
1580            .push_column_metadata(ColumnMetadata {
1581                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1582                semantic_type: SemanticType::Tag,
1583                column_id: 2,
1584            })
1585            .push_column_metadata(ColumnMetadata {
1586                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1587                semantic_type: SemanticType::Tag,
1588                column_id: 3,
1589            })
1590            .primary_key(vec![2]);
1591        let err = builder.build().unwrap_err();
1592        assert!(
1593            err.to_string()
1594                .contains("number of primary key columns 1 not equal to tag columns 2"),
1595            "unexpected err: {err}",
1596        );
1597    }
1598
1599    #[test]
1600    fn test_bump_version() {
1601        let mut region_metadata = build_test_region_metadata();
1602        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1603        builder.bump_version();
1604        let new_meta = builder.build().unwrap();
1605        region_metadata.schema_version += 1;
1606        assert_eq!(region_metadata, new_meta);
1607    }
1608
1609    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1610        let semantic_type = if is_tag {
1611            SemanticType::Tag
1612        } else {
1613            SemanticType::Field
1614        };
1615        ColumnMetadata {
1616            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1617            semantic_type,
1618            column_id,
1619        }
1620    }
1621
1622    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1623        let actual: Vec<_> = metadata
1624            .column_metadatas
1625            .iter()
1626            .map(|col| &col.column_schema.name)
1627            .collect();
1628        assert_eq!(names, actual);
1629    }
1630
1631    fn get_columns_default_constraint(
1632        metadata: &RegionMetadata,
1633        name: String,
1634    ) -> Option<Option<&ColumnDefaultConstraint>> {
1635        metadata.column_metadatas.iter().find_map(|col| {
1636            if col.column_schema.name == name {
1637                Some(col.column_schema.default_constraint())
1638            } else {
1639                None
1640            }
1641        })
1642    }
1643
1644    #[test]
1645    fn test_alter() {
1646        // a (tag), b (field), c (ts)
1647        let metadata = build_test_region_metadata();
1648        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1649        // tag d
1650        builder
1651            .alter(AlterKind::AddColumns {
1652                columns: vec![AddColumn {
1653                    column_metadata: new_column_metadata("d", true, 4),
1654                    location: None,
1655                }],
1656            })
1657            .unwrap();
1658        let metadata = builder.build().unwrap();
1659        check_columns(&metadata, &["a", "b", "c", "d"]);
1660        assert_eq!([1, 4], &metadata.primary_key[..]);
1661
1662        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1663        builder
1664            .alter(AlterKind::AddColumns {
1665                columns: vec![AddColumn {
1666                    column_metadata: new_column_metadata("e", false, 5),
1667                    location: Some(AddColumnLocation::First),
1668                }],
1669            })
1670            .unwrap();
1671        let metadata = builder.build().unwrap();
1672        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1673
1674        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1675        builder
1676            .alter(AlterKind::AddColumns {
1677                columns: vec![AddColumn {
1678                    column_metadata: new_column_metadata("f", false, 6),
1679                    location: Some(AddColumnLocation::After {
1680                        column_name: "b".to_string(),
1681                    }),
1682                }],
1683            })
1684            .unwrap();
1685        let metadata = builder.build().unwrap();
1686        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1687
1688        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1689        builder
1690            .alter(AlterKind::AddColumns {
1691                columns: vec![AddColumn {
1692                    column_metadata: new_column_metadata("g", false, 7),
1693                    location: Some(AddColumnLocation::After {
1694                        column_name: "d".to_string(),
1695                    }),
1696                }],
1697            })
1698            .unwrap();
1699        let metadata = builder.build().unwrap();
1700        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1701
1702        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1703        builder
1704            .alter(AlterKind::DropColumns {
1705                names: vec!["g".to_string(), "e".to_string()],
1706            })
1707            .unwrap();
1708        let metadata = builder.build().unwrap();
1709        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1710
1711        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1712        builder
1713            .alter(AlterKind::DropColumns {
1714                names: vec!["a".to_string()],
1715            })
1716            .unwrap();
1717        // Build returns error as the primary key contains a.
1718        let err = builder.build().unwrap_err();
1719        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1720
1721        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1722        let mut column_metadata = new_column_metadata("g", false, 8);
1723        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1724        column_metadata.column_schema = column_metadata
1725            .column_schema
1726            .with_default_constraint(default_constraint.clone())
1727            .unwrap();
1728        builder
1729            .alter(AlterKind::AddColumns {
1730                columns: vec![AddColumn {
1731                    column_metadata,
1732                    location: None,
1733                }],
1734            })
1735            .unwrap();
1736        let metadata = builder.build().unwrap();
1737        assert_eq!(
1738            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1739            default_constraint.as_ref()
1740        );
1741        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1742
1743        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1744        builder
1745            .alter(AlterKind::DropDefaults {
1746                names: vec!["g".to_string()],
1747            })
1748            .unwrap();
1749        let metadata = builder.build().unwrap();
1750        assert_eq!(
1751            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1752            None
1753        );
1754        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1755
1756        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1757        builder
1758            .alter(AlterKind::DropColumns {
1759                names: vec!["g".to_string()],
1760            })
1761            .unwrap();
1762        let metadata = builder.build().unwrap();
1763        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1764
1765        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1766        builder
1767            .alter(AlterKind::ModifyColumnTypes {
1768                columns: vec![ModifyColumnType {
1769                    column_name: "b".to_string(),
1770                    target_type: ConcreteDataType::string_datatype(),
1771                }],
1772            })
1773            .unwrap();
1774        let metadata = builder.build().unwrap();
1775        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1776        let b_type = &metadata
1777            .column_by_name("b")
1778            .unwrap()
1779            .column_schema
1780            .data_type;
1781        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1782
1783        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1784        builder
1785            .alter(AlterKind::SetIndexes {
1786                options: vec![SetIndexOption::Fulltext {
1787                    column_name: "b".to_string(),
1788                    options: FulltextOptions::new_unchecked(
1789                        true,
1790                        FulltextAnalyzer::Chinese,
1791                        true,
1792                        FulltextBackend::Bloom,
1793                        1000,
1794                        0.01,
1795                    ),
1796                }],
1797            })
1798            .unwrap();
1799        let metadata = builder.build().unwrap();
1800        let a_fulltext_options = metadata
1801            .column_by_name("b")
1802            .unwrap()
1803            .column_schema
1804            .fulltext_options()
1805            .unwrap()
1806            .unwrap();
1807        assert!(a_fulltext_options.enable);
1808        assert_eq!(
1809            datatypes::schema::FulltextAnalyzer::Chinese,
1810            a_fulltext_options.analyzer
1811        );
1812        assert!(a_fulltext_options.case_sensitive);
1813
1814        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1815        builder
1816            .alter(AlterKind::UnsetIndexes {
1817                options: vec![UnsetIndexOption::Fulltext {
1818                    column_name: "b".to_string(),
1819                }],
1820            })
1821            .unwrap();
1822        let metadata = builder.build().unwrap();
1823        let a_fulltext_options = metadata
1824            .column_by_name("b")
1825            .unwrap()
1826            .column_schema
1827            .fulltext_options()
1828            .unwrap()
1829            .unwrap();
1830        assert!(!a_fulltext_options.enable);
1831        assert_eq!(
1832            datatypes::schema::FulltextAnalyzer::Chinese,
1833            a_fulltext_options.analyzer
1834        );
1835        assert!(a_fulltext_options.case_sensitive);
1836    }
1837
1838    #[test]
1839    fn test_add_if_not_exists() {
1840        // a (tag), b (field), c (ts)
1841        let metadata = build_test_region_metadata();
1842        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1843        // tag d
1844        builder
1845            .alter(AlterKind::AddColumns {
1846                columns: vec![
1847                    AddColumn {
1848                        column_metadata: new_column_metadata("d", true, 4),
1849                        location: None,
1850                    },
1851                    AddColumn {
1852                        column_metadata: new_column_metadata("d", true, 4),
1853                        location: None,
1854                    },
1855                ],
1856            })
1857            .unwrap();
1858        let metadata = builder.build().unwrap();
1859        check_columns(&metadata, &["a", "b", "c", "d"]);
1860        assert_eq!([1, 4], &metadata.primary_key[..]);
1861
1862        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1863        // field b.
1864        builder
1865            .alter(AlterKind::AddColumns {
1866                columns: vec![AddColumn {
1867                    column_metadata: new_column_metadata("b", false, 2),
1868                    location: None,
1869                }],
1870            })
1871            .unwrap();
1872        let metadata = builder.build().unwrap();
1873        check_columns(&metadata, &["a", "b", "c", "d"]);
1874    }
1875
1876    #[test]
1877    fn test_add_column_with_inverted_index() {
1878        // only set inverted index to true explicitly will this column be inverted indexed
1879
1880        // a (tag), b (field), c (ts)
1881        let metadata = build_test_region_metadata();
1882        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1883        // tag d, e
1884        let mut col = new_column_metadata("d", true, 4);
1885        col.column_schema.set_inverted_index(true);
1886        builder
1887            .alter(AlterKind::AddColumns {
1888                columns: vec![
1889                    AddColumn {
1890                        column_metadata: col,
1891                        location: None,
1892                    },
1893                    AddColumn {
1894                        column_metadata: new_column_metadata("e", true, 5),
1895                        location: None,
1896                    },
1897                ],
1898            })
1899            .unwrap();
1900        let metadata = builder.build().unwrap();
1901        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1902        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1903        let column_metadata = metadata.column_by_name("a").unwrap();
1904        assert!(!column_metadata.column_schema.is_inverted_indexed());
1905        let column_metadata = metadata.column_by_name("b").unwrap();
1906        assert!(!column_metadata.column_schema.is_inverted_indexed());
1907        let column_metadata = metadata.column_by_name("c").unwrap();
1908        assert!(!column_metadata.column_schema.is_inverted_indexed());
1909        let column_metadata = metadata.column_by_name("d").unwrap();
1910        assert!(column_metadata.column_schema.is_inverted_indexed());
1911        let column_metadata = metadata.column_by_name("e").unwrap();
1912        assert!(!column_metadata.column_schema.is_inverted_indexed());
1913    }
1914
1915    #[test]
1916    fn test_drop_if_exists() {
1917        // a (tag), b (field), c (ts)
1918        let metadata = build_test_region_metadata();
1919        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1920        // field d, e
1921        builder
1922            .alter(AlterKind::AddColumns {
1923                columns: vec![
1924                    AddColumn {
1925                        column_metadata: new_column_metadata("d", false, 4),
1926                        location: None,
1927                    },
1928                    AddColumn {
1929                        column_metadata: new_column_metadata("e", false, 5),
1930                        location: None,
1931                    },
1932                ],
1933            })
1934            .unwrap();
1935        let metadata = builder.build().unwrap();
1936        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1937
1938        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1939        builder
1940            .alter(AlterKind::DropColumns {
1941                names: vec!["b".to_string(), "b".to_string()],
1942            })
1943            .unwrap();
1944        let metadata = builder.build().unwrap();
1945        check_columns(&metadata, &["a", "c", "d", "e"]);
1946
1947        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1948        builder
1949            .alter(AlterKind::DropColumns {
1950                names: vec!["b".to_string(), "e".to_string()],
1951            })
1952            .unwrap();
1953        let metadata = builder.build().unwrap();
1954        check_columns(&metadata, &["a", "c", "d"]);
1955    }
1956
1957    #[test]
1958    fn test_invalid_column_name() {
1959        let mut builder = create_builder();
1960        builder.push_column_metadata(ColumnMetadata {
1961            column_schema: ColumnSchema::new(
1962                "__sequence",
1963                ConcreteDataType::timestamp_millisecond_datatype(),
1964                false,
1965            ),
1966            semantic_type: SemanticType::Timestamp,
1967            column_id: 1,
1968        });
1969        let err = builder.build().unwrap_err();
1970        assert!(
1971            err.to_string()
1972                .contains("internal column name that can not be used"),
1973            "unexpected err: {err}",
1974        );
1975    }
1976
1977    #[test]
1978    fn test_allow_internal_column_name() {
1979        let mut builder = create_builder();
1980        builder
1981            .push_column_metadata(ColumnMetadata {
1982                column_schema: ColumnSchema::new(
1983                    "__primary_key",
1984                    ConcreteDataType::string_datatype(),
1985                    false,
1986                ),
1987                semantic_type: SemanticType::Tag,
1988                column_id: 1,
1989            })
1990            .push_column_metadata(ColumnMetadata {
1991                column_schema: ColumnSchema::new(
1992                    "ts",
1993                    ConcreteDataType::timestamp_millisecond_datatype(),
1994                    false,
1995                ),
1996                semantic_type: SemanticType::Timestamp,
1997                column_id: 2,
1998            })
1999            .primary_key(vec![1]);
2000
2001        let metadata = builder.build_without_validation().unwrap();
2002        assert_eq!(
2003            "__primary_key",
2004            metadata.column_metadatas[0].column_schema.name
2005        );
2006    }
2007
2008    #[test]
2009    fn test_build_without_validation() {
2010        // Primary key points to a Field column, which would normally fail validation.
2011        let mut builder = create_builder();
2012        builder
2013            .push_column_metadata(ColumnMetadata {
2014                column_schema: ColumnSchema::new(
2015                    "ts",
2016                    ConcreteDataType::timestamp_millisecond_datatype(),
2017                    false,
2018                ),
2019                semantic_type: SemanticType::Timestamp,
2020                column_id: 1,
2021            })
2022            .push_column_metadata(ColumnMetadata {
2023                column_schema: ColumnSchema::new(
2024                    "field",
2025                    ConcreteDataType::string_datatype(),
2026                    true,
2027                ),
2028                semantic_type: SemanticType::Field,
2029                column_id: 2,
2030            })
2031            .primary_key(vec![2]);
2032
2033        // Unvalidated build should succeed.
2034        let metadata = builder.build_without_validation().unwrap();
2035        assert_eq!(vec![2], metadata.primary_key);
2036
2037        // Validated build still rejects it.
2038        let mut builder = create_builder();
2039        builder
2040            .push_column_metadata(ColumnMetadata {
2041                column_schema: ColumnSchema::new(
2042                    "ts",
2043                    ConcreteDataType::timestamp_millisecond_datatype(),
2044                    false,
2045                ),
2046                semantic_type: SemanticType::Timestamp,
2047                column_id: 1,
2048            })
2049            .push_column_metadata(ColumnMetadata {
2050                column_schema: ColumnSchema::new(
2051                    "field",
2052                    ConcreteDataType::string_datatype(),
2053                    true,
2054                ),
2055                semantic_type: SemanticType::Field,
2056                column_id: 2,
2057            })
2058            .primary_key(vec![2]);
2059        let err = builder.build().unwrap_err();
2060        assert!(
2061            err.to_string()
2062                .contains("semantic type of column field should be Tag"),
2063            "unexpected err: {err}"
2064        );
2065    }
2066
2067    #[test]
2068    fn test_debug_for_column_metadata() {
2069        let region_metadata = build_test_region_metadata();
2070        let formatted = format!("{:?}", region_metadata);
2071        assert_eq!(
2072            formatted,
2073            "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2074        );
2075    }
2076
2077    #[test]
2078    fn test_region_metadata_deserialize_default_primary_key_encoding() {
2079        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2080        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2081        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2082
2083        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2084        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2085        assert_eq!(
2086            deserialized.primary_key_encoding,
2087            PrimaryKeyEncoding::Sparse
2088        );
2089    }
2090}