store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157
158    /// Partition expression serialized as a JSON string.
159    /// Compatibility behavior:
160    /// - None: no partition expr was ever set in the manifest (legacy regions).
161    /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
162    pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        f.debug_struct("RegionMetadata")
168            .field("column_metadatas", &self.column_metadatas)
169            .field("time_index", &self.time_index)
170            .field("primary_key", &self.primary_key)
171            .field("region_id", &self.region_id)
172            .field("schema_version", &self.schema_version)
173            .field("partition_expr", &self.partition_expr)
174            .finish()
175    }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182    where
183        D: Deserializer<'de>,
184    {
185        // helper internal struct for deserialization
186        #[derive(Deserialize)]
187        struct RegionMetadataWithoutSchema {
188            column_metadatas: Vec<ColumnMetadata>,
189            primary_key: Vec<ColumnId>,
190            region_id: RegionId,
191            schema_version: u64,
192            #[serde(default)]
193            primary_key_encoding: PrimaryKeyEncoding,
194            #[serde(default)]
195            partition_expr: Option<String>,
196        }
197
198        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199        let skipped =
200            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202        Ok(Self {
203            schema: skipped.schema,
204            time_index: skipped.time_index,
205            id_to_index: skipped.id_to_index,
206            column_metadatas: without_schema.column_metadatas,
207            primary_key: without_schema.primary_key,
208            region_id: without_schema.region_id,
209            schema_version: without_schema.schema_version,
210            primary_key_encoding: without_schema.primary_key_encoding,
211            partition_expr: without_schema.partition_expr,
212        })
213    }
214}
215
216impl RegionMetadata {
217    /// Decode the metadata from a JSON str.
218    pub fn from_json(s: &str) -> Result<Self> {
219        serde_json::from_str(s).context(SerdeJsonSnafu)
220    }
221
222    /// Encode the metadata to a JSON string.
223    pub fn to_json(&self) -> Result<String> {
224        serde_json::to_string(&self).context(SerdeJsonSnafu)
225    }
226
227    /// Find column by id.
228    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229        self.id_to_index
230            .get(&column_id)
231            .map(|index| &self.column_metadatas[*index])
232    }
233
234    /// Find column index by id.
235    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236        self.id_to_index.get(&column_id).copied()
237    }
238
239    /// Find column index by name.
240    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241        self.column_metadatas
242            .iter()
243            .position(|col| col.column_schema.name == column_name)
244    }
245
246    /// Returns the time index column
247    ///
248    /// # Panics
249    /// Panics if the time index column id is invalid.
250    pub fn time_index_column(&self) -> &ColumnMetadata {
251        let index = self.id_to_index[&self.time_index];
252        &self.column_metadatas[index]
253    }
254
255    /// Returns timestamp type of time index column
256    ///
257    /// # Panics
258    /// Panics if the time index column id is invalid.
259    pub fn time_index_type(&self) -> TimestampType {
260        let index = self.id_to_index[&self.time_index];
261        self.column_metadatas[index]
262            .column_schema
263            .data_type
264            .as_timestamp()
265            .unwrap()
266    }
267
268    /// Returns the position of the time index.
269    pub fn time_index_column_pos(&self) -> usize {
270        self.id_to_index[&self.time_index]
271    }
272
273    /// Returns the arrow field of the time index column.
274    pub fn time_index_field(&self) -> FieldRef {
275        let index = self.id_to_index[&self.time_index];
276        self.schema.arrow_schema().fields[index].clone()
277    }
278
279    /// Finds a column by name.
280    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281        self.schema
282            .column_index_by_name(name)
283            .map(|index| &self.column_metadatas[index])
284    }
285
286    /// Returns all primary key columns.
287    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288        // safety: RegionMetadata::validate ensures every primary key exists.
289        self.primary_key
290            .iter()
291            .map(|id| self.column_by_id(*id).unwrap())
292    }
293
294    /// Returns all field columns before projection.
295    ///
296    /// **Use with caution**. On read path where might have projection, this method
297    /// can return columns that not present in data batch.
298    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299        self.column_metadatas
300            .iter()
301            .filter(|column| column.semantic_type == SemanticType::Field)
302    }
303
304    /// Returns a column's index in primary key if it is a primary key column.
305    ///
306    /// This does a linear search.
307    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308        self.primary_key.iter().position(|id| *id == column_id)
309    }
310
311    /// Project the metadata to a new one using specified column ids.
312    ///
313    /// [RegionId] and schema version are preserved.
314    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315        // check time index
316        ensure!(
317            projection.contains(&self.time_index),
318            TimeIndexNotFoundSnafu
319        );
320
321        // prepare new indices
322        let indices_to_preserve = projection
323            .iter()
324            .map(|id| {
325                self.column_index_by_id(*id)
326                    .with_context(|| InvalidRegionRequestSnafu {
327                        region_id: self.region_id,
328                        err: format!("column id {} not found", id),
329                    })
330            })
331            .collect::<Result<Vec<_>>>()?;
332
333        // project schema
334        let projected_schema =
335            self.schema
336                .try_project(&indices_to_preserve)
337                .with_context(|_| SchemaProjectSnafu {
338                    origin_schema: self.schema.clone(),
339                    projection: projection.to_vec(),
340                })?;
341
342        // project columns, generate projected primary key and new id_to_index
343        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344        let mut projected_primary_key = vec![];
345        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346        for index in indices_to_preserve {
347            let col = self.column_metadatas[index].clone();
348            if col.semantic_type == SemanticType::Tag {
349                projected_primary_key.push(col.column_id);
350            }
351            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352            projected_column_metadatas.push(col);
353        }
354
355        Ok(RegionMetadata {
356            schema: Arc::new(projected_schema),
357            time_index: self.time_index,
358            id_to_index: projected_id_to_index,
359            column_metadatas: projected_column_metadatas,
360            primary_key: projected_primary_key,
361            region_id: self.region_id,
362            schema_version: self.schema_version,
363            primary_key_encoding: self.primary_key_encoding,
364            partition_expr: self.partition_expr.clone(),
365        })
366    }
367
368    /// Gets the column ids to be indexed by inverted index.
369    pub fn inverted_indexed_column_ids<'a>(
370        &self,
371        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372    ) -> HashSet<ColumnId> {
373        let mut inverted_index = self
374            .column_metadatas
375            .iter()
376            .filter(|column| column.column_schema.is_inverted_indexed())
377            .map(|column| column.column_id)
378            .collect::<HashSet<_>>();
379
380        for ignored in ignore_column_ids {
381            inverted_index.remove(ignored);
382        }
383
384        inverted_index
385    }
386
387    /// Checks whether the metadata is valid.
388    fn validate(&self) -> Result<()> {
389        // Id to name.
390        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
391        for col in &self.column_metadatas {
392            // Validate each column.
393            Self::validate_column_metadata(col)?;
394
395            // Check whether column id is duplicated. We already check column name
396            // is unique in `Schema` so we only check column id here.
397            ensure!(
398                !id_names.contains_key(&col.column_id),
399                InvalidMetaSnafu {
400                    reason: format!(
401                        "column {} and {} have the same column id {}",
402                        id_names[&col.column_id], col.column_schema.name, col.column_id,
403                    ),
404                }
405            );
406            id_names.insert(col.column_id, &col.column_schema.name);
407        }
408
409        // Checks there is only one time index.
410        let time_indexes = self
411            .column_metadatas
412            .iter()
413            .filter(|col| col.semantic_type == SemanticType::Timestamp)
414            .collect::<Vec<_>>();
415        ensure!(
416            time_indexes.len() == 1,
417            InvalidMetaSnafu {
418                reason: format!(
419                    "expect only one time index, found {}: {}",
420                    time_indexes.len(),
421                    time_indexes
422                        .iter()
423                        .map(|c| &c.column_schema.name)
424                        .join(", ")
425                ),
426            }
427        );
428
429        // Checks the time index column is not nullable.
430        ensure!(
431            !self.time_index_column().column_schema.is_nullable(),
432            InvalidMetaSnafu {
433                reason: format!(
434                    "time index column {} must be NOT NULL",
435                    self.time_index_column().column_schema.name
436                ),
437            }
438        );
439
440        if !self.primary_key.is_empty() {
441            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
442            // Checks column ids in the primary key is valid.
443            for column_id in &self.primary_key {
444                // Checks whether the column id exists.
445                ensure!(
446                    id_names.contains_key(column_id),
447                    InvalidMetaSnafu {
448                        reason: format!("unknown column id {}", column_id),
449                    }
450                );
451
452                // Safety: Column with specific id must exist.
453                let column = self.column_by_id(*column_id).unwrap();
454                // Checks duplicate.
455                ensure!(
456                    !pk_ids.contains(&column_id),
457                    InvalidMetaSnafu {
458                        reason: format!(
459                            "duplicate column {} in primary key",
460                            column.column_schema.name
461                        ),
462                    }
463                );
464
465                // Checks this is not a time index column.
466                ensure!(
467                    *column_id != self.time_index,
468                    InvalidMetaSnafu {
469                        reason: format!(
470                            "column {} is already a time index column",
471                            column.column_schema.name,
472                        ),
473                    }
474                );
475
476                // Checks semantic type.
477                ensure!(
478                    column.semantic_type == SemanticType::Tag,
479                    InvalidMetaSnafu {
480                        reason: format!(
481                            "semantic type of column {} should be Tag, not {:?}",
482                            column.column_schema.name, column.semantic_type
483                        ),
484                    }
485                );
486
487                pk_ids.insert(column_id);
488            }
489        }
490
491        // Checks tag semantic type.
492        let num_tag = self
493            .column_metadatas
494            .iter()
495            .filter(|col| col.semantic_type == SemanticType::Tag)
496            .count();
497        ensure!(
498            num_tag == self.primary_key.len(),
499            InvalidMetaSnafu {
500                reason: format!(
501                    "number of primary key columns {} not equal to tag columns {}",
502                    self.primary_key.len(),
503                    num_tag
504                ),
505            }
506        );
507
508        Ok(())
509    }
510
511    /// Checks whether it is a valid column.
512    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
513        if column_metadata.semantic_type == SemanticType::Timestamp {
514            ensure!(
515                column_metadata.column_schema.data_type.is_timestamp(),
516                InvalidMetaSnafu {
517                    reason: format!(
518                        "column `{}` is not timestamp type",
519                        column_metadata.column_schema.name
520                    ),
521                }
522            );
523        }
524
525        ensure!(
526            !is_internal_column(&column_metadata.column_schema.name),
527            InvalidMetaSnafu {
528                reason: format!(
529                    "{} is internal column name that can not be used",
530                    column_metadata.column_schema.name
531                ),
532            }
533        );
534
535        Ok(())
536    }
537}
538
539/// Builder to build [RegionMetadata].
540pub struct RegionMetadataBuilder {
541    region_id: RegionId,
542    column_metadatas: Vec<ColumnMetadata>,
543    primary_key: Vec<ColumnId>,
544    schema_version: u64,
545    primary_key_encoding: PrimaryKeyEncoding,
546    partition_expr: Option<String>,
547}
548
549impl RegionMetadataBuilder {
550    /// Returns a new builder.
551    pub fn new(id: RegionId) -> Self {
552        Self {
553            region_id: id,
554            column_metadatas: vec![],
555            primary_key: vec![],
556            schema_version: 0,
557            primary_key_encoding: PrimaryKeyEncoding::Dense,
558            partition_expr: None,
559        }
560    }
561
562    /// Creates a builder from existing [RegionMetadata].
563    pub fn from_existing(existing: RegionMetadata) -> Self {
564        Self {
565            column_metadatas: existing.column_metadatas,
566            primary_key: existing.primary_key,
567            region_id: existing.region_id,
568            schema_version: existing.schema_version,
569            primary_key_encoding: existing.primary_key_encoding,
570            partition_expr: existing.partition_expr,
571        }
572    }
573
574    /// Sets the primary key encoding mode.
575    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
576        self.primary_key_encoding = encoding;
577        self
578    }
579
580    /// Sets the partition expression in JSON string form.
581    pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
582        self.partition_expr = expr_json;
583        self
584    }
585
586    /// Pushes a new column metadata to this region's metadata.
587    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
588        self.column_metadatas.push(column_metadata);
589        self
590    }
591
592    /// Sets the primary key of the region.
593    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
594        self.primary_key = key;
595        self
596    }
597
598    /// Increases the schema version by 1.
599    pub fn bump_version(&mut self) -> &mut Self {
600        self.schema_version += 1;
601        self
602    }
603
604    /// Applies the alter `kind` to the builder.
605    ///
606    /// The `kind` should be valid.
607    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
608        match kind {
609            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
610            AlterKind::DropColumns { names } => self.drop_columns(&names),
611            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
612            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
613            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
614            AlterKind::SetRegionOptions { options: _ } => {
615                // nothing to be done with RegionMetadata
616            }
617            AlterKind::UnsetRegionOptions { keys: _ } => {
618                // nothing to be done with RegionMetadata
619            }
620            AlterKind::DropDefaults { names } => {
621                self.drop_defaults(names)?;
622            }
623            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
624            AlterKind::SyncColumns { column_metadatas } => {
625                self.primary_key = column_metadatas
626                    .iter()
627                    .filter_map(|column_metadata| {
628                        if column_metadata.semantic_type == SemanticType::Tag {
629                            Some(column_metadata.column_id)
630                        } else {
631                            None
632                        }
633                    })
634                    .collect::<Vec<_>>();
635                self.column_metadatas = column_metadatas;
636            }
637        }
638        Ok(self)
639    }
640
641    /// Consumes the builder and build a [RegionMetadata].
642    pub fn build(self) -> Result<RegionMetadata> {
643        self.build_with_options(true)
644    }
645
646    /// Builds metadata without running validation.
647    ///
648    /// Intended for file/external engines that should accept arbitrary schemas
649    /// coming from files.
650    pub fn build_without_validation(self) -> Result<RegionMetadata> {
651        self.build_with_options(false)
652    }
653
654    fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
655        let skipped = SkippedFields::new(&self.column_metadatas)?;
656
657        let meta = RegionMetadata {
658            schema: skipped.schema,
659            time_index: skipped.time_index,
660            id_to_index: skipped.id_to_index,
661            column_metadatas: self.column_metadatas,
662            primary_key: self.primary_key,
663            region_id: self.region_id,
664            schema_version: self.schema_version,
665            primary_key_encoding: self.primary_key_encoding,
666            partition_expr: self.partition_expr,
667        };
668
669        if validate {
670            meta.validate()?;
671        }
672
673        Ok(meta)
674    }
675
676    /// Adds columns to the metadata if not exist.
677    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
678        let mut names: HashSet<_> = self
679            .column_metadatas
680            .iter()
681            .map(|col| col.column_schema.name.clone())
682            .collect();
683
684        for add_column in columns {
685            if names.contains(&add_column.column_metadata.column_schema.name) {
686                // Column already exists.
687                continue;
688            }
689
690            let column_id = add_column.column_metadata.column_id;
691            let semantic_type = add_column.column_metadata.semantic_type;
692            let column_name = add_column.column_metadata.column_schema.name.clone();
693            match add_column.location {
694                None => {
695                    self.column_metadatas.push(add_column.column_metadata);
696                }
697                Some(AddColumnLocation::First) => {
698                    self.column_metadatas.insert(0, add_column.column_metadata);
699                }
700                Some(AddColumnLocation::After { column_name }) => {
701                    let pos = self
702                        .column_metadatas
703                        .iter()
704                        .position(|col| col.column_schema.name == column_name)
705                        .context(InvalidRegionRequestSnafu {
706                            region_id: self.region_id,
707                            err: format!(
708                                "column {} not found, failed to add column {} after it",
709                                column_name, add_column.column_metadata.column_schema.name
710                            ),
711                        })?;
712                    // Insert after pos.
713                    self.column_metadatas
714                        .insert(pos + 1, add_column.column_metadata);
715                }
716            }
717            names.insert(column_name);
718            if semantic_type == SemanticType::Tag {
719                // For a new tag, we extend the primary key.
720                self.primary_key.push(column_id);
721            }
722        }
723
724        Ok(())
725    }
726
727    /// Drops columns from the metadata if exist.
728    fn drop_columns(&mut self, names: &[String]) {
729        let name_set: HashSet<_> = names.iter().collect();
730        self.column_metadatas
731            .retain(|col| !name_set.contains(&col.column_schema.name));
732    }
733
734    /// Changes columns type to the metadata if exist.
735    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
736        let mut change_type_map: HashMap<_, _> = columns
737            .into_iter()
738            .map(
739                |ModifyColumnType {
740                     column_name,
741                     target_type,
742                 }| (column_name, target_type),
743            )
744            .collect();
745
746        for column_meta in self.column_metadatas.iter_mut() {
747            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
748                column_meta.column_schema.data_type = target_type.clone();
749                // also cast default value to target_type if default value exist
750                let new_default =
751                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
752                        Some(
753                            default_value
754                                .cast_to_datatype(&target_type)
755                                .with_context(|_| CastDefaultValueSnafu {
756                                    reason: format!(
757                                        "Failed to cast default value from {:?} to type {:?}",
758                                        default_value, target_type
759                                    ),
760                                })?,
761                        )
762                    } else {
763                        None
764                    };
765                column_meta.column_schema = column_meta
766                    .column_schema
767                    .clone()
768                    .with_default_constraint(new_default.clone())
769                    .with_context(|_| CastDefaultValueSnafu {
770                        reason: format!("Failed to set new default: {:?}", new_default),
771                    })?;
772            }
773        }
774
775        Ok(())
776    }
777
778    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
779        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
780        for option in &options {
781            set_index_map
782                .entry(option.column_name())
783                .or_default()
784                .push(option);
785        }
786
787        for column_metadata in self.column_metadatas.iter_mut() {
788            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
789                for option in options {
790                    Self::set_index(column_metadata, option)?;
791                }
792            }
793        }
794
795        Ok(())
796    }
797
798    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
799        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
800        for option in &options {
801            unset_index_map
802                .entry(option.column_name())
803                .or_default()
804                .push(option);
805        }
806
807        for column_metadata in self.column_metadatas.iter_mut() {
808            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
809                for option in options {
810                    Self::unset_index(column_metadata, option)?;
811                }
812            }
813        }
814
815        Ok(())
816    }
817
818    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
819        match options {
820            SetIndexOption::Fulltext {
821                column_name,
822                options,
823            } => {
824                ensure!(
825                    column_metadata.column_schema.data_type.is_string(),
826                    InvalidColumnOptionSnafu {
827                        column_name,
828                        msg: "FULLTEXT index only supports string type".to_string(),
829                    }
830                );
831                let current_fulltext_options = column_metadata
832                    .column_schema
833                    .fulltext_options()
834                    .with_context(|_| GetFulltextOptionsSnafu {
835                        column_name: column_name.clone(),
836                    })?;
837                set_column_fulltext_options(
838                    column_metadata,
839                    column_name,
840                    options,
841                    current_fulltext_options,
842                )?;
843            }
844            SetIndexOption::Inverted { .. } => {
845                column_metadata.column_schema.set_inverted_index(true)
846            }
847            SetIndexOption::Skipping {
848                column_name,
849                options,
850            } => {
851                column_metadata
852                    .column_schema
853                    .set_skipping_options(options)
854                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
855            }
856        }
857
858        Ok(())
859    }
860
861    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
862        match options {
863            UnsetIndexOption::Fulltext { column_name } => {
864                ensure!(
865                    column_metadata.column_schema.data_type.is_string(),
866                    InvalidColumnOptionSnafu {
867                        column_name,
868                        msg: "FULLTEXT index only supports string type".to_string(),
869                    }
870                );
871
872                let current_fulltext_options = column_metadata
873                    .column_schema
874                    .fulltext_options()
875                    .with_context(|_| GetFulltextOptionsSnafu {
876                        column_name: column_name.clone(),
877                    })?;
878
879                unset_column_fulltext_options(
880                    column_metadata,
881                    column_name,
882                    current_fulltext_options,
883                )?;
884            }
885            UnsetIndexOption::Inverted { .. } => {
886                column_metadata.column_schema.set_inverted_index(false)
887            }
888            UnsetIndexOption::Skipping { column_name } => {
889                column_metadata
890                    .column_schema
891                    .unset_skipping_options()
892                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
893            }
894        }
895
896        Ok(())
897    }
898
899    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
900        for name in column_names.iter() {
901            let meta = self
902                .column_metadatas
903                .iter_mut()
904                .find(|col| col.column_schema.name == *name);
905            if let Some(meta) = meta {
906                if !meta.column_schema.is_nullable() {
907                    return InvalidRegionRequestSnafu {
908                        region_id: self.region_id,
909                        err: format!(
910                            "column {name} is not nullable and `default` cannot be dropped",
911                        ),
912                    }
913                    .fail();
914                }
915                meta.column_schema = meta
916                    .column_schema
917                    .clone()
918                    .with_default_constraint(None)
919                    .with_context(|_| CastDefaultValueSnafu {
920                        reason: format!("Failed to drop default : {name:?}"),
921                    })?;
922            } else {
923                return InvalidRegionRequestSnafu {
924                    region_id: self.region_id,
925                    err: format!("column {name} not found",),
926                }
927                .fail();
928            }
929        }
930        Ok(())
931    }
932
933    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
934        for set_default in set_defaults.iter() {
935            let meta = self
936                .column_metadatas
937                .iter_mut()
938                .find(|col| col.column_schema.name == set_default.name);
939            if let Some(meta) = meta {
940                let default_constraint = common_sql::convert::deserialize_default_constraint(
941                    set_default.default_constraint.as_slice(),
942                    &meta.column_schema.name,
943                    &meta.column_schema.data_type,
944                )
945                .context(SqlCommonSnafu)?;
946
947                meta.column_schema = meta
948                    .column_schema
949                    .clone()
950                    .with_default_constraint(default_constraint)
951                    .with_context(|_| CastDefaultValueSnafu {
952                        reason: format!("Failed to set default : {set_default:?}"),
953                    })?;
954            } else {
955                return InvalidRegionRequestSnafu {
956                    region_id: self.region_id,
957                    err: format!("column {} not found", set_default.name),
958                }
959                .fail();
960            }
961        }
962        Ok(())
963    }
964}
965
966/// Fields skipped in serialization.
967struct SkippedFields {
968    /// Last schema.
969    schema: SchemaRef,
970    /// Id of the time index column.
971    time_index: ColumnId,
972    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
973    id_to_index: HashMap<ColumnId, usize>,
974}
975
976impl SkippedFields {
977    /// Constructs skipped fields from `column_metadatas`.
978    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
979        let column_schemas = column_metadatas
980            .iter()
981            .map(|column_metadata| column_metadata.column_schema.clone())
982            .collect();
983        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
984        let time_index = column_metadatas
985            .iter()
986            .find_map(|col| {
987                if col.semantic_type == SemanticType::Timestamp {
988                    Some(col.column_id)
989                } else {
990                    None
991                }
992            })
993            .context(InvalidMetaSnafu {
994                reason: "time index not found",
995            })?;
996        let id_to_index = column_metadatas
997            .iter()
998            .enumerate()
999            .map(|(idx, col)| (col.column_id, idx))
1000            .collect();
1001
1002        Ok(SkippedFields {
1003            schema,
1004            time_index,
1005            id_to_index,
1006        })
1007    }
1008}
1009
1010#[derive(Snafu)]
1011#[snafu(visibility(pub))]
1012#[stack_trace_debug]
1013pub enum MetadataError {
1014    #[snafu(display("Invalid schema"))]
1015    InvalidSchema {
1016        source: datatypes::error::Error,
1017        #[snafu(implicit)]
1018        location: Location,
1019    },
1020
1021    #[snafu(display("Invalid metadata, {}", reason))]
1022    InvalidMeta {
1023        reason: String,
1024        #[snafu(implicit)]
1025        location: Location,
1026    },
1027
1028    #[snafu(display("Failed to ser/de json object"))]
1029    SerdeJson {
1030        #[snafu(implicit)]
1031        location: Location,
1032        #[snafu(source)]
1033        error: serde_json::Error,
1034    },
1035
1036    #[snafu(display("Invalid raw region request, err: {}", err))]
1037    InvalidRawRegionRequest {
1038        err: String,
1039        #[snafu(implicit)]
1040        location: Location,
1041    },
1042
1043    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1044    InvalidRegionRequest {
1045        region_id: RegionId,
1046        err: String,
1047        #[snafu(implicit)]
1048        location: Location,
1049    },
1050
1051    #[snafu(display("Unexpected schema error during project"))]
1052    SchemaProject {
1053        origin_schema: SchemaRef,
1054        projection: Vec<ColumnId>,
1055        #[snafu(implicit)]
1056        location: Location,
1057        source: datatypes::Error,
1058    },
1059
1060    #[snafu(display("Time index column not found"))]
1061    TimeIndexNotFound {
1062        #[snafu(implicit)]
1063        location: Location,
1064    },
1065
1066    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1067    ChangeColumnNotFound {
1068        column_name: String,
1069        region_id: RegionId,
1070        #[snafu(implicit)]
1071        location: Location,
1072    },
1073
1074    #[snafu(display("Failed to convert column schema"))]
1075    ConvertColumnSchema {
1076        source: api::error::Error,
1077        #[snafu(implicit)]
1078        location: Location,
1079    },
1080
1081    #[snafu(display("Failed to convert TimeRanges"))]
1082    ConvertTimeRanges {
1083        source: api::error::Error,
1084        #[snafu(implicit)]
1085        location: Location,
1086    },
1087
1088    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1089    InvalidSetRegionOptionRequest {
1090        key: String,
1091        value: String,
1092        #[snafu(implicit)]
1093        location: Location,
1094    },
1095
1096    #[snafu(display("Invalid set region option request, key: {}", key))]
1097    InvalidUnsetRegionOptionRequest {
1098        key: String,
1099        #[snafu(implicit)]
1100        location: Location,
1101    },
1102
1103    #[snafu(display("Failed to decode protobuf"))]
1104    DecodeProto {
1105        #[snafu(source)]
1106        error: prost::UnknownEnumValue,
1107        #[snafu(implicit)]
1108        location: Location,
1109    },
1110
1111    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1112    InvalidColumnOption {
1113        column_name: String,
1114        msg: String,
1115        #[snafu(implicit)]
1116        location: Location,
1117    },
1118
1119    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1120    SetFulltextOptions {
1121        column_name: String,
1122        source: datatypes::Error,
1123        #[snafu(implicit)]
1124        location: Location,
1125    },
1126
1127    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1128    GetFulltextOptions {
1129        column_name: String,
1130        source: datatypes::Error,
1131        #[snafu(implicit)]
1132        location: Location,
1133    },
1134
1135    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1136    SetSkippingIndexOptions {
1137        column_name: String,
1138        source: datatypes::Error,
1139        #[snafu(implicit)]
1140        location: Location,
1141    },
1142
1143    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1144    UnsetSkippingIndexOptions {
1145        column_name: String,
1146        source: datatypes::Error,
1147        #[snafu(implicit)]
1148        location: Location,
1149    },
1150
1151    #[snafu(display("Failed to decode arrow ipc record batches"))]
1152    DecodeArrowIpc {
1153        #[snafu(source)]
1154        error: arrow::error::ArrowError,
1155        #[snafu(implicit)]
1156        location: Location,
1157    },
1158
1159    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1160    CastDefaultValue {
1161        reason: String,
1162        source: datatypes::Error,
1163        #[snafu(implicit)]
1164        location: Location,
1165    },
1166
1167    #[snafu(display("Unexpected: {}", reason))]
1168    Unexpected {
1169        reason: String,
1170        #[snafu(implicit)]
1171        location: Location,
1172    },
1173
1174    #[snafu(display("Failed to encode/decode flight message"))]
1175    FlightCodec {
1176        source: common_grpc::Error,
1177        #[snafu(implicit)]
1178        location: Location,
1179    },
1180
1181    #[snafu(display("Invalid index option"))]
1182    InvalidIndexOption {
1183        #[snafu(implicit)]
1184        location: Location,
1185        #[snafu(source)]
1186        error: datatypes::error::Error,
1187    },
1188
1189    #[snafu(display("Sql common error"))]
1190    SqlCommon {
1191        source: common_sql::error::Error,
1192        #[snafu(implicit)]
1193        location: Location,
1194    },
1195}
1196
1197impl ErrorExt for MetadataError {
1198    fn status_code(&self) -> StatusCode {
1199        match self {
1200            Self::SqlCommon { source, .. } => source.status_code(),
1201            _ => StatusCode::InvalidArguments,
1202        }
1203    }
1204
1205    fn as_any(&self) -> &dyn Any {
1206        self
1207    }
1208}
1209
1210/// Set column fulltext options if it passed the validation.
1211///
1212/// Options allowed to modify:
1213/// * backend
1214///
1215/// Options not allowed to modify:
1216/// * analyzer
1217/// * case_sensitive
1218fn set_column_fulltext_options(
1219    column_meta: &mut ColumnMetadata,
1220    column_name: &str,
1221    options: &FulltextOptions,
1222    current_options: Option<FulltextOptions>,
1223) -> Result<()> {
1224    if let Some(current_options) = current_options {
1225        ensure!(
1226            current_options.analyzer == options.analyzer
1227                && current_options.case_sensitive == options.case_sensitive,
1228            InvalidColumnOptionSnafu {
1229                column_name,
1230                msg: format!(
1231                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1232                    current_options.analyzer, current_options.case_sensitive
1233                ),
1234            }
1235        );
1236    }
1237
1238    column_meta
1239        .column_schema
1240        .set_fulltext_options(options)
1241        .context(SetFulltextOptionsSnafu { column_name })?;
1242
1243    Ok(())
1244}
1245
1246fn unset_column_fulltext_options(
1247    column_meta: &mut ColumnMetadata,
1248    column_name: &str,
1249    current_options: Option<FulltextOptions>,
1250) -> Result<()> {
1251    if let Some(mut current_options) = current_options
1252        && current_options.enable
1253    {
1254        current_options.enable = false;
1255        column_meta
1256            .column_schema
1257            .set_fulltext_options(&current_options)
1258            .context(SetFulltextOptionsSnafu { column_name })?;
1259    } else {
1260        return InvalidColumnOptionSnafu {
1261            column_name,
1262            msg: "FULLTEXT index already disabled",
1263        }
1264        .fail();
1265    }
1266
1267    Ok(())
1268}
1269
1270#[cfg(test)]
1271mod test {
1272    use datatypes::prelude::ConcreteDataType;
1273    use datatypes::schema::{
1274        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1275    };
1276    use datatypes::value::Value;
1277
1278    use super::*;
1279
1280    fn create_builder() -> RegionMetadataBuilder {
1281        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1282    }
1283
1284    fn build_test_region_metadata() -> RegionMetadata {
1285        let mut builder = create_builder();
1286        builder
1287            .push_column_metadata(ColumnMetadata {
1288                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1289                semantic_type: SemanticType::Tag,
1290                column_id: 1,
1291            })
1292            .push_column_metadata(ColumnMetadata {
1293                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1294                semantic_type: SemanticType::Field,
1295                column_id: 2,
1296            })
1297            .push_column_metadata(ColumnMetadata {
1298                column_schema: ColumnSchema::new(
1299                    "c",
1300                    ConcreteDataType::timestamp_millisecond_datatype(),
1301                    false,
1302                ),
1303                semantic_type: SemanticType::Timestamp,
1304                column_id: 3,
1305            })
1306            .primary_key(vec![1])
1307            .partition_expr_json(Some("".to_string()));
1308        builder.build().unwrap()
1309    }
1310
1311    #[test]
1312    fn test_region_metadata() {
1313        let region_metadata = build_test_region_metadata();
1314        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1315        assert_eq!(
1316            "a",
1317            region_metadata.column_by_id(1).unwrap().column_schema.name
1318        );
1319        assert_eq!(None, region_metadata.column_by_id(10));
1320    }
1321
1322    #[test]
1323    fn test_region_metadata_serde() {
1324        let region_metadata = build_test_region_metadata();
1325        let serialized = serde_json::to_string(&region_metadata).unwrap();
1326        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1327        assert_eq!(region_metadata, deserialized);
1328    }
1329
1330    #[test]
1331    fn test_column_metadata_validate() {
1332        let mut builder = create_builder();
1333        let col = ColumnMetadata {
1334            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1335            semantic_type: SemanticType::Timestamp,
1336            column_id: 1,
1337        };
1338
1339        builder.push_column_metadata(col);
1340        let err = builder.build().unwrap_err();
1341        assert!(
1342            err.to_string()
1343                .contains("column `ts` is not timestamp type"),
1344            "unexpected err: {err}",
1345        );
1346    }
1347
1348    #[test]
1349    fn test_empty_region_metadata() {
1350        let builder = create_builder();
1351        let err = builder.build().unwrap_err();
1352        // A region must have a time index.
1353        assert!(
1354            err.to_string().contains("time index not found"),
1355            "unexpected err: {err}",
1356        );
1357    }
1358
1359    #[test]
1360    fn test_same_column_id() {
1361        let mut builder = create_builder();
1362        builder
1363            .push_column_metadata(ColumnMetadata {
1364                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1365                semantic_type: SemanticType::Tag,
1366                column_id: 1,
1367            })
1368            .push_column_metadata(ColumnMetadata {
1369                column_schema: ColumnSchema::new(
1370                    "b",
1371                    ConcreteDataType::timestamp_millisecond_datatype(),
1372                    false,
1373                ),
1374                semantic_type: SemanticType::Timestamp,
1375                column_id: 1,
1376            });
1377        let err = builder.build().unwrap_err();
1378        assert!(
1379            err.to_string()
1380                .contains("column a and b have the same column id"),
1381            "unexpected err: {err}",
1382        );
1383    }
1384
1385    #[test]
1386    fn test_duplicate_time_index() {
1387        let mut builder = create_builder();
1388        builder
1389            .push_column_metadata(ColumnMetadata {
1390                column_schema: ColumnSchema::new(
1391                    "a",
1392                    ConcreteDataType::timestamp_millisecond_datatype(),
1393                    false,
1394                ),
1395                semantic_type: SemanticType::Timestamp,
1396                column_id: 1,
1397            })
1398            .push_column_metadata(ColumnMetadata {
1399                column_schema: ColumnSchema::new(
1400                    "b",
1401                    ConcreteDataType::timestamp_millisecond_datatype(),
1402                    false,
1403                ),
1404                semantic_type: SemanticType::Timestamp,
1405                column_id: 2,
1406            });
1407        let err = builder.build().unwrap_err();
1408        assert!(
1409            err.to_string().contains("expect only one time index"),
1410            "unexpected err: {err}",
1411        );
1412    }
1413
1414    #[test]
1415    fn test_unknown_primary_key() {
1416        let mut builder = create_builder();
1417        builder
1418            .push_column_metadata(ColumnMetadata {
1419                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1420                semantic_type: SemanticType::Tag,
1421                column_id: 1,
1422            })
1423            .push_column_metadata(ColumnMetadata {
1424                column_schema: ColumnSchema::new(
1425                    "b",
1426                    ConcreteDataType::timestamp_millisecond_datatype(),
1427                    false,
1428                ),
1429                semantic_type: SemanticType::Timestamp,
1430                column_id: 2,
1431            })
1432            .primary_key(vec![3]);
1433        let err = builder.build().unwrap_err();
1434        assert!(
1435            err.to_string().contains("unknown column id 3"),
1436            "unexpected err: {err}",
1437        );
1438    }
1439
1440    #[test]
1441    fn test_same_primary_key() {
1442        let mut builder = create_builder();
1443        builder
1444            .push_column_metadata(ColumnMetadata {
1445                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1446                semantic_type: SemanticType::Tag,
1447                column_id: 1,
1448            })
1449            .push_column_metadata(ColumnMetadata {
1450                column_schema: ColumnSchema::new(
1451                    "b",
1452                    ConcreteDataType::timestamp_millisecond_datatype(),
1453                    false,
1454                ),
1455                semantic_type: SemanticType::Timestamp,
1456                column_id: 2,
1457            })
1458            .primary_key(vec![1, 1]);
1459        let err = builder.build().unwrap_err();
1460        assert!(
1461            err.to_string()
1462                .contains("duplicate column a in primary key"),
1463            "unexpected err: {err}",
1464        );
1465    }
1466
1467    #[test]
1468    fn test_in_time_index() {
1469        let mut builder = create_builder();
1470        builder
1471            .push_column_metadata(ColumnMetadata {
1472                column_schema: ColumnSchema::new(
1473                    "ts",
1474                    ConcreteDataType::timestamp_millisecond_datatype(),
1475                    false,
1476                ),
1477                semantic_type: SemanticType::Timestamp,
1478                column_id: 1,
1479            })
1480            .primary_key(vec![1]);
1481        let err = builder.build().unwrap_err();
1482        assert!(
1483            err.to_string()
1484                .contains("column ts is already a time index column"),
1485            "unexpected err: {err}",
1486        );
1487    }
1488
1489    #[test]
1490    fn test_nullable_time_index() {
1491        let mut builder = create_builder();
1492        builder.push_column_metadata(ColumnMetadata {
1493            column_schema: ColumnSchema::new(
1494                "ts",
1495                ConcreteDataType::timestamp_millisecond_datatype(),
1496                true,
1497            ),
1498            semantic_type: SemanticType::Timestamp,
1499            column_id: 1,
1500        });
1501        let err = builder.build().unwrap_err();
1502        assert!(
1503            err.to_string()
1504                .contains("time index column ts must be NOT NULL"),
1505            "unexpected err: {err}",
1506        );
1507    }
1508
1509    #[test]
1510    fn test_primary_key_semantic_type() {
1511        let mut builder = create_builder();
1512        builder
1513            .push_column_metadata(ColumnMetadata {
1514                column_schema: ColumnSchema::new(
1515                    "ts",
1516                    ConcreteDataType::timestamp_millisecond_datatype(),
1517                    false,
1518                ),
1519                semantic_type: SemanticType::Timestamp,
1520                column_id: 1,
1521            })
1522            .push_column_metadata(ColumnMetadata {
1523                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1524                semantic_type: SemanticType::Field,
1525                column_id: 2,
1526            })
1527            .primary_key(vec![2]);
1528        let err = builder.build().unwrap_err();
1529        assert!(
1530            err.to_string()
1531                .contains("semantic type of column a should be Tag, not Field"),
1532            "unexpected err: {err}",
1533        );
1534    }
1535
1536    #[test]
1537    fn test_primary_key_tag_num() {
1538        let mut builder = create_builder();
1539        builder
1540            .push_column_metadata(ColumnMetadata {
1541                column_schema: ColumnSchema::new(
1542                    "ts",
1543                    ConcreteDataType::timestamp_millisecond_datatype(),
1544                    false,
1545                ),
1546                semantic_type: SemanticType::Timestamp,
1547                column_id: 1,
1548            })
1549            .push_column_metadata(ColumnMetadata {
1550                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1551                semantic_type: SemanticType::Tag,
1552                column_id: 2,
1553            })
1554            .push_column_metadata(ColumnMetadata {
1555                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1556                semantic_type: SemanticType::Tag,
1557                column_id: 3,
1558            })
1559            .primary_key(vec![2]);
1560        let err = builder.build().unwrap_err();
1561        assert!(
1562            err.to_string()
1563                .contains("number of primary key columns 1 not equal to tag columns 2"),
1564            "unexpected err: {err}",
1565        );
1566    }
1567
1568    #[test]
1569    fn test_bump_version() {
1570        let mut region_metadata = build_test_region_metadata();
1571        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1572        builder.bump_version();
1573        let new_meta = builder.build().unwrap();
1574        region_metadata.schema_version += 1;
1575        assert_eq!(region_metadata, new_meta);
1576    }
1577
1578    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1579        let semantic_type = if is_tag {
1580            SemanticType::Tag
1581        } else {
1582            SemanticType::Field
1583        };
1584        ColumnMetadata {
1585            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1586            semantic_type,
1587            column_id,
1588        }
1589    }
1590
1591    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1592        let actual: Vec<_> = metadata
1593            .column_metadatas
1594            .iter()
1595            .map(|col| &col.column_schema.name)
1596            .collect();
1597        assert_eq!(names, actual);
1598    }
1599
1600    fn get_columns_default_constraint(
1601        metadata: &RegionMetadata,
1602        name: String,
1603    ) -> Option<Option<&ColumnDefaultConstraint>> {
1604        metadata.column_metadatas.iter().find_map(|col| {
1605            if col.column_schema.name == name {
1606                Some(col.column_schema.default_constraint())
1607            } else {
1608                None
1609            }
1610        })
1611    }
1612
1613    #[test]
1614    fn test_alter() {
1615        // a (tag), b (field), c (ts)
1616        let metadata = build_test_region_metadata();
1617        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1618        // tag d
1619        builder
1620            .alter(AlterKind::AddColumns {
1621                columns: vec![AddColumn {
1622                    column_metadata: new_column_metadata("d", true, 4),
1623                    location: None,
1624                }],
1625            })
1626            .unwrap();
1627        let metadata = builder.build().unwrap();
1628        check_columns(&metadata, &["a", "b", "c", "d"]);
1629        assert_eq!([1, 4], &metadata.primary_key[..]);
1630
1631        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1632        builder
1633            .alter(AlterKind::AddColumns {
1634                columns: vec![AddColumn {
1635                    column_metadata: new_column_metadata("e", false, 5),
1636                    location: Some(AddColumnLocation::First),
1637                }],
1638            })
1639            .unwrap();
1640        let metadata = builder.build().unwrap();
1641        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1642
1643        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1644        builder
1645            .alter(AlterKind::AddColumns {
1646                columns: vec![AddColumn {
1647                    column_metadata: new_column_metadata("f", false, 6),
1648                    location: Some(AddColumnLocation::After {
1649                        column_name: "b".to_string(),
1650                    }),
1651                }],
1652            })
1653            .unwrap();
1654        let metadata = builder.build().unwrap();
1655        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1656
1657        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1658        builder
1659            .alter(AlterKind::AddColumns {
1660                columns: vec![AddColumn {
1661                    column_metadata: new_column_metadata("g", false, 7),
1662                    location: Some(AddColumnLocation::After {
1663                        column_name: "d".to_string(),
1664                    }),
1665                }],
1666            })
1667            .unwrap();
1668        let metadata = builder.build().unwrap();
1669        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1670
1671        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1672        builder
1673            .alter(AlterKind::DropColumns {
1674                names: vec!["g".to_string(), "e".to_string()],
1675            })
1676            .unwrap();
1677        let metadata = builder.build().unwrap();
1678        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1679
1680        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1681        builder
1682            .alter(AlterKind::DropColumns {
1683                names: vec!["a".to_string()],
1684            })
1685            .unwrap();
1686        // Build returns error as the primary key contains a.
1687        let err = builder.build().unwrap_err();
1688        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1689
1690        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1691        let mut column_metadata = new_column_metadata("g", false, 8);
1692        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1693        column_metadata.column_schema = column_metadata
1694            .column_schema
1695            .with_default_constraint(default_constraint.clone())
1696            .unwrap();
1697        builder
1698            .alter(AlterKind::AddColumns {
1699                columns: vec![AddColumn {
1700                    column_metadata,
1701                    location: None,
1702                }],
1703            })
1704            .unwrap();
1705        let metadata = builder.build().unwrap();
1706        assert_eq!(
1707            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1708            default_constraint.as_ref()
1709        );
1710        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1711
1712        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1713        builder
1714            .alter(AlterKind::DropDefaults {
1715                names: vec!["g".to_string()],
1716            })
1717            .unwrap();
1718        let metadata = builder.build().unwrap();
1719        assert_eq!(
1720            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1721            None
1722        );
1723        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1724
1725        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1726        builder
1727            .alter(AlterKind::DropColumns {
1728                names: vec!["g".to_string()],
1729            })
1730            .unwrap();
1731        let metadata = builder.build().unwrap();
1732        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1733
1734        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1735        builder
1736            .alter(AlterKind::ModifyColumnTypes {
1737                columns: vec![ModifyColumnType {
1738                    column_name: "b".to_string(),
1739                    target_type: ConcreteDataType::string_datatype(),
1740                }],
1741            })
1742            .unwrap();
1743        let metadata = builder.build().unwrap();
1744        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1745        let b_type = &metadata
1746            .column_by_name("b")
1747            .unwrap()
1748            .column_schema
1749            .data_type;
1750        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1751
1752        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1753        builder
1754            .alter(AlterKind::SetIndexes {
1755                options: vec![SetIndexOption::Fulltext {
1756                    column_name: "b".to_string(),
1757                    options: FulltextOptions::new_unchecked(
1758                        true,
1759                        FulltextAnalyzer::Chinese,
1760                        true,
1761                        FulltextBackend::Bloom,
1762                        1000,
1763                        0.01,
1764                    ),
1765                }],
1766            })
1767            .unwrap();
1768        let metadata = builder.build().unwrap();
1769        let a_fulltext_options = metadata
1770            .column_by_name("b")
1771            .unwrap()
1772            .column_schema
1773            .fulltext_options()
1774            .unwrap()
1775            .unwrap();
1776        assert!(a_fulltext_options.enable);
1777        assert_eq!(
1778            datatypes::schema::FulltextAnalyzer::Chinese,
1779            a_fulltext_options.analyzer
1780        );
1781        assert!(a_fulltext_options.case_sensitive);
1782
1783        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1784        builder
1785            .alter(AlterKind::UnsetIndexes {
1786                options: vec![UnsetIndexOption::Fulltext {
1787                    column_name: "b".to_string(),
1788                }],
1789            })
1790            .unwrap();
1791        let metadata = builder.build().unwrap();
1792        let a_fulltext_options = metadata
1793            .column_by_name("b")
1794            .unwrap()
1795            .column_schema
1796            .fulltext_options()
1797            .unwrap()
1798            .unwrap();
1799        assert!(!a_fulltext_options.enable);
1800        assert_eq!(
1801            datatypes::schema::FulltextAnalyzer::Chinese,
1802            a_fulltext_options.analyzer
1803        );
1804        assert!(a_fulltext_options.case_sensitive);
1805    }
1806
1807    #[test]
1808    fn test_add_if_not_exists() {
1809        // a (tag), b (field), c (ts)
1810        let metadata = build_test_region_metadata();
1811        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1812        // tag d
1813        builder
1814            .alter(AlterKind::AddColumns {
1815                columns: vec![
1816                    AddColumn {
1817                        column_metadata: new_column_metadata("d", true, 4),
1818                        location: None,
1819                    },
1820                    AddColumn {
1821                        column_metadata: new_column_metadata("d", true, 4),
1822                        location: None,
1823                    },
1824                ],
1825            })
1826            .unwrap();
1827        let metadata = builder.build().unwrap();
1828        check_columns(&metadata, &["a", "b", "c", "d"]);
1829        assert_eq!([1, 4], &metadata.primary_key[..]);
1830
1831        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1832        // field b.
1833        builder
1834            .alter(AlterKind::AddColumns {
1835                columns: vec![AddColumn {
1836                    column_metadata: new_column_metadata("b", false, 2),
1837                    location: None,
1838                }],
1839            })
1840            .unwrap();
1841        let metadata = builder.build().unwrap();
1842        check_columns(&metadata, &["a", "b", "c", "d"]);
1843    }
1844
1845    #[test]
1846    fn test_add_column_with_inverted_index() {
1847        // only set inverted index to true explicitly will this column be inverted indexed
1848
1849        // a (tag), b (field), c (ts)
1850        let metadata = build_test_region_metadata();
1851        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1852        // tag d, e
1853        let mut col = new_column_metadata("d", true, 4);
1854        col.column_schema.set_inverted_index(true);
1855        builder
1856            .alter(AlterKind::AddColumns {
1857                columns: vec![
1858                    AddColumn {
1859                        column_metadata: col,
1860                        location: None,
1861                    },
1862                    AddColumn {
1863                        column_metadata: new_column_metadata("e", true, 5),
1864                        location: None,
1865                    },
1866                ],
1867            })
1868            .unwrap();
1869        let metadata = builder.build().unwrap();
1870        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1871        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1872        let column_metadata = metadata.column_by_name("a").unwrap();
1873        assert!(!column_metadata.column_schema.is_inverted_indexed());
1874        let column_metadata = metadata.column_by_name("b").unwrap();
1875        assert!(!column_metadata.column_schema.is_inverted_indexed());
1876        let column_metadata = metadata.column_by_name("c").unwrap();
1877        assert!(!column_metadata.column_schema.is_inverted_indexed());
1878        let column_metadata = metadata.column_by_name("d").unwrap();
1879        assert!(column_metadata.column_schema.is_inverted_indexed());
1880        let column_metadata = metadata.column_by_name("e").unwrap();
1881        assert!(!column_metadata.column_schema.is_inverted_indexed());
1882    }
1883
1884    #[test]
1885    fn test_drop_if_exists() {
1886        // a (tag), b (field), c (ts)
1887        let metadata = build_test_region_metadata();
1888        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1889        // field d, e
1890        builder
1891            .alter(AlterKind::AddColumns {
1892                columns: vec![
1893                    AddColumn {
1894                        column_metadata: new_column_metadata("d", false, 4),
1895                        location: None,
1896                    },
1897                    AddColumn {
1898                        column_metadata: new_column_metadata("e", false, 5),
1899                        location: None,
1900                    },
1901                ],
1902            })
1903            .unwrap();
1904        let metadata = builder.build().unwrap();
1905        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1906
1907        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1908        builder
1909            .alter(AlterKind::DropColumns {
1910                names: vec!["b".to_string(), "b".to_string()],
1911            })
1912            .unwrap();
1913        let metadata = builder.build().unwrap();
1914        check_columns(&metadata, &["a", "c", "d", "e"]);
1915
1916        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1917        builder
1918            .alter(AlterKind::DropColumns {
1919                names: vec!["b".to_string(), "e".to_string()],
1920            })
1921            .unwrap();
1922        let metadata = builder.build().unwrap();
1923        check_columns(&metadata, &["a", "c", "d"]);
1924    }
1925
1926    #[test]
1927    fn test_invalid_column_name() {
1928        let mut builder = create_builder();
1929        builder.push_column_metadata(ColumnMetadata {
1930            column_schema: ColumnSchema::new(
1931                "__sequence",
1932                ConcreteDataType::timestamp_millisecond_datatype(),
1933                false,
1934            ),
1935            semantic_type: SemanticType::Timestamp,
1936            column_id: 1,
1937        });
1938        let err = builder.build().unwrap_err();
1939        assert!(
1940            err.to_string()
1941                .contains("internal column name that can not be used"),
1942            "unexpected err: {err}",
1943        );
1944    }
1945
1946    #[test]
1947    fn test_allow_internal_column_name() {
1948        let mut builder = create_builder();
1949        builder
1950            .push_column_metadata(ColumnMetadata {
1951                column_schema: ColumnSchema::new(
1952                    "__primary_key",
1953                    ConcreteDataType::string_datatype(),
1954                    false,
1955                ),
1956                semantic_type: SemanticType::Tag,
1957                column_id: 1,
1958            })
1959            .push_column_metadata(ColumnMetadata {
1960                column_schema: ColumnSchema::new(
1961                    "ts",
1962                    ConcreteDataType::timestamp_millisecond_datatype(),
1963                    false,
1964                ),
1965                semantic_type: SemanticType::Timestamp,
1966                column_id: 2,
1967            })
1968            .primary_key(vec![1]);
1969
1970        let metadata = builder.build_without_validation().unwrap();
1971        assert_eq!(
1972            "__primary_key",
1973            metadata.column_metadatas[0].column_schema.name
1974        );
1975    }
1976
1977    #[test]
1978    fn test_build_without_validation() {
1979        // Primary key points to a Field column, which would normally fail validation.
1980        let mut builder = create_builder();
1981        builder
1982            .push_column_metadata(ColumnMetadata {
1983                column_schema: ColumnSchema::new(
1984                    "ts",
1985                    ConcreteDataType::timestamp_millisecond_datatype(),
1986                    false,
1987                ),
1988                semantic_type: SemanticType::Timestamp,
1989                column_id: 1,
1990            })
1991            .push_column_metadata(ColumnMetadata {
1992                column_schema: ColumnSchema::new(
1993                    "field",
1994                    ConcreteDataType::string_datatype(),
1995                    true,
1996                ),
1997                semantic_type: SemanticType::Field,
1998                column_id: 2,
1999            })
2000            .primary_key(vec![2]);
2001
2002        // Unvalidated build should succeed.
2003        let metadata = builder.build_without_validation().unwrap();
2004        assert_eq!(vec![2], metadata.primary_key);
2005
2006        // Validated build still rejects it.
2007        let mut builder = create_builder();
2008        builder
2009            .push_column_metadata(ColumnMetadata {
2010                column_schema: ColumnSchema::new(
2011                    "ts",
2012                    ConcreteDataType::timestamp_millisecond_datatype(),
2013                    false,
2014                ),
2015                semantic_type: SemanticType::Timestamp,
2016                column_id: 1,
2017            })
2018            .push_column_metadata(ColumnMetadata {
2019                column_schema: ColumnSchema::new(
2020                    "field",
2021                    ConcreteDataType::string_datatype(),
2022                    true,
2023                ),
2024                semantic_type: SemanticType::Field,
2025                column_id: 2,
2026            })
2027            .primary_key(vec![2]);
2028        let err = builder.build().unwrap_err();
2029        assert!(
2030            err.to_string()
2031                .contains("semantic type of column field should be Tag"),
2032            "unexpected err: {err}"
2033        );
2034    }
2035
2036    #[test]
2037    fn test_debug_for_column_metadata() {
2038        let region_metadata = build_test_region_metadata();
2039        let formatted = format!("{:?}", region_metadata);
2040        assert_eq!(
2041            formatted,
2042            "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2043        );
2044    }
2045
2046    #[test]
2047    fn test_region_metadata_deserialize_default_primary_key_encoding() {
2048        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2049        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2050        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2051
2052        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2053        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2054        assert_eq!(
2055            deserialized.primary_key_encoding,
2056            PrimaryKeyEncoding::Sparse
2057        );
2058    }
2059}