store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use std::{fmt, mem};
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_base::hash::partition_expr_version;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_macro::stack_trace_debug;
31use datatypes::arrow;
32use datatypes::arrow::datatypes::FieldRef;
33use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
34use datatypes::types::TimestampType;
35use itertools::Itertools;
36use serde::de::Error;
37use serde::{Deserialize, Deserializer, Serialize};
38use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
39
40use crate::codec::PrimaryKeyEncoding;
41use crate::region_request::{
42    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
43};
44use crate::storage::consts::is_internal_column;
45use crate::storage::{ColumnId, RegionId};
46
47pub type Result<T> = std::result::Result<T, MetadataError>;
48
49/// Metadata of a column.
50#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
51pub struct ColumnMetadata {
52    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
53    pub column_schema: ColumnSchema,
54    /// Semantic type of this column (e.g. tag or timestamp).
55    pub semantic_type: SemanticType,
56    /// Immutable and unique id of a region.
57    pub column_id: ColumnId,
58}
59
60impl fmt::Debug for ColumnMetadata {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(
63            f,
64            "[{:?} {:?} {:?}]",
65            self.column_schema, self.semantic_type, self.column_id,
66        )
67    }
68}
69
70impl ColumnMetadata {
71    /// Construct `Self` from protobuf struct [RegionColumnDef]
72    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
73        let column_id = column_def.column_id;
74        let column_def = column_def
75            .column_def
76            .context(InvalidRawRegionRequestSnafu {
77                err: "column_def is absent",
78            })?;
79        let semantic_type = column_def.semantic_type();
80        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
81
82        Ok(Self {
83            column_schema,
84            semantic_type,
85            column_id,
86        })
87    }
88
89    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
90    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
91        serde_json::to_vec(columns)
92    }
93
94    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
95    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
96        serde_json::from_slice(bytes)
97    }
98
99    pub fn is_same_datatype(&self, other: &Self) -> bool {
100        self.column_schema.data_type == other.column_schema.data_type
101    }
102
103    /// Returns the estimated memory footprint of this metadata.
104    pub fn estimated_size(&self) -> usize {
105        mem::size_of_val(self) - mem::size_of_val(&self.column_schema)
106            + self.column_schema.estimated_size()
107    }
108}
109
110#[cfg_attr(doc, aquamarine::aquamarine)]
111/// General static metadata of a region.
112///
113/// This struct implements [Serialize] and [Deserialize] traits.
114/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
115///
116/// ```mermaid
117/// class RegionMetadata {
118///     +RegionId region_id
119///     +SchemaRef schema
120///     +Vec&lt;ColumnMetadata&gt; column_metadatas
121///     +Vec&lt;ColumnId&gt; primary_key
122/// }
123/// class Schema
124/// class ColumnMetadata {
125///     +ColumnSchema column_schema
126///     +SemanticTyle semantic_type
127///     +ColumnId column_id
128/// }
129/// class SemanticType
130/// RegionMetadata o-- Schema
131/// RegionMetadata o-- ColumnMetadata
132/// ColumnMetadata o-- SemanticType
133/// ```
134#[derive(Clone, PartialEq, Eq, Serialize)]
135pub struct RegionMetadata {
136    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
137    #[serde(skip)]
138    pub schema: SchemaRef,
139
140    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
141    // so we can assumes they are valid.
142    /// Id of the time index column.
143    #[serde(skip)]
144    time_index: ColumnId,
145    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
146    #[serde(skip)]
147    id_to_index: HashMap<ColumnId, usize>,
148
149    /// Columns in the region. Has the same order as columns
150    /// in [schema](RegionMetadata::schema).
151    pub column_metadatas: Vec<ColumnMetadata>,
152    /// Maintains an ordered list of primary keys
153    pub primary_key: Vec<ColumnId>,
154
155    /// Immutable and unique id of a region.
156    pub region_id: RegionId,
157    /// Current version of the region schema.
158    ///
159    /// The version starts from 0. Altering the schema bumps the version.
160    pub schema_version: u64,
161
162    /// Primary key encoding mode.
163    pub primary_key_encoding: PrimaryKeyEncoding,
164
165    /// Partition expression serialized as a JSON string.
166    /// Compatibility behavior:
167    /// - None: no partition expr was ever set in the manifest (legacy regions).
168    /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
169    pub partition_expr: Option<String>,
170    #[serde(skip)]
171    pub partition_expr_version: u64,
172}
173
174impl fmt::Debug for RegionMetadata {
175    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
176        f.debug_struct("RegionMetadata")
177            .field("column_metadatas", &self.column_metadatas)
178            .field("time_index", &self.time_index)
179            .field("primary_key", &self.primary_key)
180            .field("region_id", &self.region_id)
181            .field("schema_version", &self.schema_version)
182            .field("partition_expr", &self.partition_expr)
183            .finish()
184    }
185}
186
187pub type RegionMetadataRef = Arc<RegionMetadata>;
188
189impl<'de> Deserialize<'de> for RegionMetadata {
190    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
191    where
192        D: Deserializer<'de>,
193    {
194        // helper internal struct for deserialization
195        #[derive(Deserialize)]
196        struct RegionMetadataWithoutSchema {
197            column_metadatas: Vec<ColumnMetadata>,
198            primary_key: Vec<ColumnId>,
199            region_id: RegionId,
200            schema_version: u64,
201            #[serde(default)]
202            primary_key_encoding: PrimaryKeyEncoding,
203            #[serde(default)]
204            partition_expr: Option<String>,
205        }
206
207        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
208        let skipped =
209            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
210
211        let partition_expr_version =
212            partition_expr_version(without_schema.partition_expr.as_deref());
213
214        Ok(Self {
215            schema: skipped.schema,
216            time_index: skipped.time_index,
217            id_to_index: skipped.id_to_index,
218            column_metadatas: without_schema.column_metadatas,
219            primary_key: without_schema.primary_key,
220            region_id: without_schema.region_id,
221            schema_version: without_schema.schema_version,
222            primary_key_encoding: without_schema.primary_key_encoding,
223            partition_expr: without_schema.partition_expr,
224            partition_expr_version,
225        })
226    }
227}
228
229impl RegionMetadata {
230    /// Decode the metadata from a JSON str.
231    pub fn from_json(s: &str) -> Result<Self> {
232        serde_json::from_str(s).context(SerdeJsonSnafu)
233    }
234
235    /// Returns the estimated memory footprint of this metadata.
236    pub fn estimated_size(&self) -> usize {
237        mem::size_of_val(self)
238            + mem::size_of::<ColumnMetadata>() * self.column_metadatas.capacity()
239            + self
240                .column_metadatas
241                .iter()
242                .map(|column| column.estimated_size() - mem::size_of::<ColumnMetadata>())
243                .sum::<usize>()
244            + mem::size_of::<ColumnId>() * self.primary_key.capacity()
245            + mem::size_of::<(ColumnId, usize)>() * self.id_to_index.capacity()
246            + self.schema.estimated_size()
247            + self
248                .partition_expr
249                .as_ref()
250                .map(|expr| expr.capacity())
251                .unwrap_or_default()
252    }
253
254    /// Encode the metadata to a JSON string.
255    pub fn to_json(&self) -> Result<String> {
256        serde_json::to_string(&self).context(SerdeJsonSnafu)
257    }
258
259    pub fn set_partition_expr(&mut self, expr: Option<String>) {
260        self.partition_expr_version = partition_expr_version(expr.as_deref());
261        self.partition_expr = expr;
262    }
263
264    /// Find column by id.
265    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
266        self.id_to_index
267            .get(&column_id)
268            .map(|index| &self.column_metadatas[*index])
269    }
270
271    /// Find column index by id.
272    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
273        self.id_to_index.get(&column_id).copied()
274    }
275
276    /// Find column index by name.
277    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
278        self.column_metadatas
279            .iter()
280            .position(|col| col.column_schema.name == column_name)
281    }
282
283    /// Returns the time index column
284    ///
285    /// # Panics
286    /// Panics if the time index column id is invalid.
287    pub fn time_index_column(&self) -> &ColumnMetadata {
288        let index = self.id_to_index[&self.time_index];
289        &self.column_metadatas[index]
290    }
291
292    /// Returns timestamp type of time index column
293    ///
294    /// # Panics
295    /// Panics if the time index column id is invalid.
296    pub fn time_index_type(&self) -> TimestampType {
297        let index = self.id_to_index[&self.time_index];
298        self.column_metadatas[index]
299            .column_schema
300            .data_type
301            .as_timestamp()
302            .unwrap()
303    }
304
305    /// Returns the position of the time index.
306    pub fn time_index_column_pos(&self) -> usize {
307        self.id_to_index[&self.time_index]
308    }
309
310    /// Returns the arrow field of the time index column.
311    pub fn time_index_field(&self) -> FieldRef {
312        let index = self.id_to_index[&self.time_index];
313        self.schema.arrow_schema().fields[index].clone()
314    }
315
316    /// Finds a column by name.
317    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
318        self.schema
319            .column_index_by_name(name)
320            .map(|index| &self.column_metadatas[index])
321    }
322
323    /// Returns all primary key columns.
324    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
325        // safety: RegionMetadata::validate ensures every primary key exists.
326        self.primary_key
327            .iter()
328            .map(|id| self.column_by_id(*id).unwrap())
329    }
330
331    /// Returns all field columns before projection.
332    ///
333    /// **Use with caution**. On read path where might have projection, this method
334    /// can return columns that not present in data batch.
335    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
336        self.column_metadatas
337            .iter()
338            .filter(|column| column.semantic_type == SemanticType::Field)
339    }
340
341    /// Returns a column's index in primary key if it is a primary key column.
342    ///
343    /// This does a linear search.
344    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
345        self.primary_key.iter().position(|id| *id == column_id)
346    }
347
348    /// Project the metadata to a new one using specified column ids.
349    ///
350    /// [RegionId] and schema version are preserved.
351    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
352        // check time index
353        ensure!(
354            projection.contains(&self.time_index),
355            TimeIndexNotFoundSnafu
356        );
357
358        // prepare new indices
359        let indices_to_preserve = projection
360            .iter()
361            .map(|id| {
362                self.column_index_by_id(*id)
363                    .with_context(|| InvalidRegionRequestSnafu {
364                        region_id: self.region_id,
365                        err: format!("column id {} not found", id),
366                    })
367            })
368            .collect::<Result<Vec<_>>>()?;
369
370        // project schema
371        let projected_schema =
372            self.schema
373                .try_project(&indices_to_preserve)
374                .with_context(|_| SchemaProjectSnafu {
375                    origin_schema: self.schema.clone(),
376                    projection: projection.to_vec(),
377                })?;
378
379        // project columns, generate projected primary key and new id_to_index
380        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
381        let mut projected_primary_key = vec![];
382        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
383        for index in indices_to_preserve {
384            let col = self.column_metadatas[index].clone();
385            if col.semantic_type == SemanticType::Tag {
386                projected_primary_key.push(col.column_id);
387            }
388            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
389            projected_column_metadatas.push(col);
390        }
391
392        Ok(RegionMetadata {
393            schema: Arc::new(projected_schema),
394            time_index: self.time_index,
395            id_to_index: projected_id_to_index,
396            column_metadatas: projected_column_metadatas,
397            primary_key: projected_primary_key,
398            region_id: self.region_id,
399            schema_version: self.schema_version,
400            primary_key_encoding: self.primary_key_encoding,
401            partition_expr: self.partition_expr.clone(),
402            partition_expr_version: partition_expr_version(self.partition_expr.as_deref()),
403        })
404    }
405
406    /// Gets the column ids to be indexed by inverted index.
407    pub fn inverted_indexed_column_ids<'a>(
408        &self,
409        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
410    ) -> HashSet<ColumnId> {
411        let mut inverted_index = self
412            .column_metadatas
413            .iter()
414            .filter(|column| column.column_schema.is_inverted_indexed())
415            .map(|column| column.column_id)
416            .collect::<HashSet<_>>();
417
418        for ignored in ignore_column_ids {
419            inverted_index.remove(ignored);
420        }
421
422        inverted_index
423    }
424
425    /// Gets the column IDs that have vector indexes along with their options.
426    /// Returns a map from column ID to the vector index options.
427    pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
428        self.column_metadatas
429            .iter()
430            .filter_map(|column| {
431                column
432                    .column_schema
433                    .vector_index_options()
434                    .ok()
435                    .flatten()
436                    .map(|options| (column.column_id, options))
437            })
438            .collect()
439    }
440
441    /// Checks whether the metadata is valid.
442    fn validate(&self) -> Result<()> {
443        // Id to name.
444        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
445        for col in &self.column_metadatas {
446            // Validate each column.
447            Self::validate_column_metadata(col)?;
448
449            // Check whether column id is duplicated. We already check column name
450            // is unique in `Schema` so we only check column id here.
451            ensure!(
452                !id_names.contains_key(&col.column_id),
453                InvalidMetaSnafu {
454                    reason: format!(
455                        "column {} and {} have the same column id {}",
456                        id_names[&col.column_id], col.column_schema.name, col.column_id,
457                    ),
458                }
459            );
460            id_names.insert(col.column_id, &col.column_schema.name);
461        }
462
463        // Checks there is only one time index.
464        let time_indexes = self
465            .column_metadatas
466            .iter()
467            .filter(|col| col.semantic_type == SemanticType::Timestamp)
468            .collect::<Vec<_>>();
469        ensure!(
470            time_indexes.len() == 1,
471            InvalidMetaSnafu {
472                reason: format!(
473                    "expect only one time index, found {}: {}",
474                    time_indexes.len(),
475                    time_indexes
476                        .iter()
477                        .map(|c| &c.column_schema.name)
478                        .join(", ")
479                ),
480            }
481        );
482
483        // Checks the time index column is not nullable.
484        ensure!(
485            !self.time_index_column().column_schema.is_nullable(),
486            InvalidMetaSnafu {
487                reason: format!(
488                    "time index column {} must be NOT NULL",
489                    self.time_index_column().column_schema.name
490                ),
491            }
492        );
493
494        if !self.primary_key.is_empty() {
495            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
496            // Checks column ids in the primary key is valid.
497            for column_id in &self.primary_key {
498                // Checks whether the column id exists.
499                ensure!(
500                    id_names.contains_key(column_id),
501                    InvalidMetaSnafu {
502                        reason: format!("unknown column id {}", column_id),
503                    }
504                );
505
506                // Safety: Column with specific id must exist.
507                let column = self.column_by_id(*column_id).unwrap();
508                // Checks duplicate.
509                ensure!(
510                    !pk_ids.contains(&column_id),
511                    InvalidMetaSnafu {
512                        reason: format!(
513                            "duplicate column {} in primary key",
514                            column.column_schema.name
515                        ),
516                    }
517                );
518
519                // Checks this is not a time index column.
520                ensure!(
521                    *column_id != self.time_index,
522                    InvalidMetaSnafu {
523                        reason: format!(
524                            "column {} is already a time index column",
525                            column.column_schema.name,
526                        ),
527                    }
528                );
529
530                // Checks semantic type.
531                ensure!(
532                    column.semantic_type == SemanticType::Tag,
533                    InvalidMetaSnafu {
534                        reason: format!(
535                            "semantic type of column {} should be Tag, not {:?}",
536                            column.column_schema.name, column.semantic_type
537                        ),
538                    }
539                );
540
541                pk_ids.insert(column_id);
542            }
543        }
544
545        // Checks tag semantic type.
546        let num_tag = self
547            .column_metadatas
548            .iter()
549            .filter(|col| col.semantic_type == SemanticType::Tag)
550            .count();
551        ensure!(
552            num_tag == self.primary_key.len(),
553            InvalidMetaSnafu {
554                reason: format!(
555                    "number of primary key columns {} not equal to tag columns {}",
556                    self.primary_key.len(),
557                    num_tag
558                ),
559            }
560        );
561
562        Ok(())
563    }
564
565    /// Checks whether it is a valid column.
566    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
567        if column_metadata.semantic_type == SemanticType::Timestamp {
568            ensure!(
569                column_metadata.column_schema.data_type.is_timestamp(),
570                InvalidMetaSnafu {
571                    reason: format!(
572                        "column `{}` is not timestamp type",
573                        column_metadata.column_schema.name
574                    ),
575                }
576            );
577        }
578
579        ensure!(
580            !is_internal_column(&column_metadata.column_schema.name),
581            InvalidMetaSnafu {
582                reason: format!(
583                    "{} is internal column name that can not be used",
584                    column_metadata.column_schema.name
585                ),
586            }
587        );
588
589        Ok(())
590    }
591}
592
593/// Builder to build [RegionMetadata].
594pub struct RegionMetadataBuilder {
595    region_id: RegionId,
596    column_metadatas: Vec<ColumnMetadata>,
597    primary_key: Vec<ColumnId>,
598    schema_version: u64,
599    primary_key_encoding: PrimaryKeyEncoding,
600    partition_expr: Option<String>,
601}
602
603impl RegionMetadataBuilder {
604    /// Returns a new builder.
605    pub fn new(id: RegionId) -> Self {
606        Self {
607            region_id: id,
608            column_metadatas: vec![],
609            primary_key: vec![],
610            schema_version: 0,
611            primary_key_encoding: PrimaryKeyEncoding::Dense,
612            partition_expr: None,
613        }
614    }
615
616    /// Creates a builder from existing [RegionMetadata].
617    pub fn from_existing(existing: RegionMetadata) -> Self {
618        Self {
619            column_metadatas: existing.column_metadatas,
620            primary_key: existing.primary_key,
621            region_id: existing.region_id,
622            schema_version: existing.schema_version,
623            primary_key_encoding: existing.primary_key_encoding,
624            partition_expr: existing.partition_expr,
625        }
626    }
627
628    /// Sets the primary key encoding mode.
629    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
630        self.primary_key_encoding = encoding;
631        self
632    }
633
634    /// Sets the partition expression in JSON string form.
635    pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
636        self.partition_expr = expr_json;
637        self
638    }
639
640    /// Pushes a new column metadata to this region's metadata.
641    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
642        self.column_metadatas.push(column_metadata);
643        self
644    }
645
646    /// Sets the primary key of the region.
647    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
648        self.primary_key = key;
649        self
650    }
651
652    /// Increases the schema version by 1.
653    pub fn bump_version(&mut self) -> &mut Self {
654        self.schema_version += 1;
655        self
656    }
657
658    /// Applies the alter `kind` to the builder.
659    ///
660    /// The `kind` should be valid.
661    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
662        match kind {
663            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
664            AlterKind::DropColumns { names } => self.drop_columns(&names),
665            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
666            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
667            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
668            AlterKind::SetRegionOptions { options: _ } => {
669                // nothing to be done with RegionMetadata
670            }
671            AlterKind::UnsetRegionOptions { keys: _ } => {
672                // nothing to be done with RegionMetadata
673            }
674            AlterKind::DropDefaults { names } => {
675                self.drop_defaults(names)?;
676            }
677            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
678            AlterKind::SyncColumns { column_metadatas } => {
679                self.primary_key = column_metadatas
680                    .iter()
681                    .filter_map(|column_metadata| {
682                        if column_metadata.semantic_type == SemanticType::Tag {
683                            Some(column_metadata.column_id)
684                        } else {
685                            None
686                        }
687                    })
688                    .collect::<Vec<_>>();
689                self.column_metadatas = column_metadatas;
690            }
691        }
692        Ok(self)
693    }
694
695    /// Consumes the builder and build a [RegionMetadata].
696    pub fn build(self) -> Result<RegionMetadata> {
697        self.build_with_options(true)
698    }
699
700    /// Builds metadata without running validation.
701    ///
702    /// Intended for file/external engines that should accept arbitrary schemas
703    /// coming from files.
704    pub fn build_without_validation(self) -> Result<RegionMetadata> {
705        self.build_with_options(false)
706    }
707
708    fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
709        let skipped = SkippedFields::new(&self.column_metadatas)?;
710
711        let partition_expr_version = partition_expr_version(self.partition_expr.as_deref());
712        let meta = RegionMetadata {
713            schema: skipped.schema,
714            time_index: skipped.time_index,
715            id_to_index: skipped.id_to_index,
716            column_metadatas: self.column_metadatas,
717            primary_key: self.primary_key,
718            region_id: self.region_id,
719            schema_version: self.schema_version,
720            primary_key_encoding: self.primary_key_encoding,
721            partition_expr: self.partition_expr,
722            partition_expr_version,
723        };
724
725        if validate {
726            meta.validate()?;
727        }
728
729        Ok(meta)
730    }
731
732    /// Adds columns to the metadata if not exist.
733    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
734        let mut names: HashSet<_> = self
735            .column_metadatas
736            .iter()
737            .map(|col| col.column_schema.name.clone())
738            .collect();
739
740        for add_column in columns {
741            if names.contains(&add_column.column_metadata.column_schema.name) {
742                // Column already exists.
743                continue;
744            }
745
746            let column_id = add_column.column_metadata.column_id;
747            let semantic_type = add_column.column_metadata.semantic_type;
748            let column_name = add_column.column_metadata.column_schema.name.clone();
749            match add_column.location {
750                None => {
751                    self.column_metadatas.push(add_column.column_metadata);
752                }
753                Some(AddColumnLocation::First) => {
754                    self.column_metadatas.insert(0, add_column.column_metadata);
755                }
756                Some(AddColumnLocation::After { column_name }) => {
757                    let pos = self
758                        .column_metadatas
759                        .iter()
760                        .position(|col| col.column_schema.name == column_name)
761                        .context(InvalidRegionRequestSnafu {
762                            region_id: self.region_id,
763                            err: format!(
764                                "column {} not found, failed to add column {} after it",
765                                column_name, add_column.column_metadata.column_schema.name
766                            ),
767                        })?;
768                    // Insert after pos.
769                    self.column_metadatas
770                        .insert(pos + 1, add_column.column_metadata);
771                }
772            }
773            names.insert(column_name);
774            if semantic_type == SemanticType::Tag {
775                // For a new tag, we extend the primary key.
776                self.primary_key.push(column_id);
777            }
778        }
779
780        Ok(())
781    }
782
783    /// Drops columns from the metadata if exist.
784    fn drop_columns(&mut self, names: &[String]) {
785        let name_set: HashSet<_> = names.iter().collect();
786        self.column_metadatas
787            .retain(|col| !name_set.contains(&col.column_schema.name));
788    }
789
790    /// Changes columns type to the metadata if exist.
791    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
792        let mut change_type_map: HashMap<_, _> = columns
793            .into_iter()
794            .map(
795                |ModifyColumnType {
796                     column_name,
797                     target_type,
798                 }| (column_name, target_type),
799            )
800            .collect();
801
802        for column_meta in self.column_metadatas.iter_mut() {
803            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
804                column_meta.column_schema.data_type = target_type.clone();
805                // also cast default value to target_type if default value exist
806                let new_default =
807                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
808                        Some(
809                            default_value
810                                .cast_to_datatype(&target_type)
811                                .with_context(|_| CastDefaultValueSnafu {
812                                    reason: format!(
813                                        "Failed to cast default value from {:?} to type {:?}",
814                                        default_value, target_type
815                                    ),
816                                })?,
817                        )
818                    } else {
819                        None
820                    };
821                column_meta.column_schema = column_meta
822                    .column_schema
823                    .clone()
824                    .with_default_constraint(new_default.clone())
825                    .with_context(|_| CastDefaultValueSnafu {
826                        reason: format!("Failed to set new default: {:?}", new_default),
827                    })?;
828            }
829        }
830
831        Ok(())
832    }
833
834    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
835        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
836        for option in &options {
837            set_index_map
838                .entry(option.column_name())
839                .or_default()
840                .push(option);
841        }
842
843        for column_metadata in self.column_metadatas.iter_mut() {
844            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
845                for option in options {
846                    Self::set_index(column_metadata, option)?;
847                }
848            }
849        }
850
851        Ok(())
852    }
853
854    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
855        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
856        for option in &options {
857            unset_index_map
858                .entry(option.column_name())
859                .or_default()
860                .push(option);
861        }
862
863        for column_metadata in self.column_metadatas.iter_mut() {
864            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
865                for option in options {
866                    Self::unset_index(column_metadata, option)?;
867                }
868            }
869        }
870
871        Ok(())
872    }
873
874    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
875        match options {
876            SetIndexOption::Fulltext {
877                column_name,
878                options,
879            } => {
880                ensure!(
881                    column_metadata.column_schema.data_type.is_string(),
882                    InvalidColumnOptionSnafu {
883                        column_name,
884                        msg: "FULLTEXT index only supports string type".to_string(),
885                    }
886                );
887                let current_fulltext_options = column_metadata
888                    .column_schema
889                    .fulltext_options()
890                    .with_context(|_| GetFulltextOptionsSnafu {
891                        column_name: column_name.clone(),
892                    })?;
893                set_column_fulltext_options(
894                    column_metadata,
895                    column_name,
896                    options,
897                    current_fulltext_options,
898                )?;
899            }
900            SetIndexOption::Inverted { .. } => {
901                column_metadata.column_schema.set_inverted_index(true)
902            }
903            SetIndexOption::Skipping {
904                column_name,
905                options,
906            } => {
907                column_metadata
908                    .column_schema
909                    .set_skipping_options(options)
910                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
911            }
912        }
913
914        Ok(())
915    }
916
917    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
918        match options {
919            UnsetIndexOption::Fulltext { column_name } => {
920                ensure!(
921                    column_metadata.column_schema.data_type.is_string(),
922                    InvalidColumnOptionSnafu {
923                        column_name,
924                        msg: "FULLTEXT index only supports string type".to_string(),
925                    }
926                );
927
928                let current_fulltext_options = column_metadata
929                    .column_schema
930                    .fulltext_options()
931                    .with_context(|_| GetFulltextOptionsSnafu {
932                        column_name: column_name.clone(),
933                    })?;
934
935                unset_column_fulltext_options(
936                    column_metadata,
937                    column_name,
938                    current_fulltext_options,
939                )?;
940            }
941            UnsetIndexOption::Inverted { .. } => {
942                column_metadata.column_schema.set_inverted_index(false)
943            }
944            UnsetIndexOption::Skipping { column_name } => {
945                column_metadata
946                    .column_schema
947                    .unset_skipping_options()
948                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
949            }
950        }
951
952        Ok(())
953    }
954
955    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
956        for name in column_names.iter() {
957            let meta = self
958                .column_metadatas
959                .iter_mut()
960                .find(|col| col.column_schema.name == *name);
961            if let Some(meta) = meta {
962                if !meta.column_schema.is_nullable() {
963                    return InvalidRegionRequestSnafu {
964                        region_id: self.region_id,
965                        err: format!(
966                            "column {name} is not nullable and `default` cannot be dropped",
967                        ),
968                    }
969                    .fail();
970                }
971                meta.column_schema = meta
972                    .column_schema
973                    .clone()
974                    .with_default_constraint(None)
975                    .with_context(|_| CastDefaultValueSnafu {
976                        reason: format!("Failed to drop default : {name:?}"),
977                    })?;
978            } else {
979                return InvalidRegionRequestSnafu {
980                    region_id: self.region_id,
981                    err: format!("column {name} not found",),
982                }
983                .fail();
984            }
985        }
986        Ok(())
987    }
988
989    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
990        for set_default in set_defaults.iter() {
991            let meta = self
992                .column_metadatas
993                .iter_mut()
994                .find(|col| col.column_schema.name == set_default.name);
995            if let Some(meta) = meta {
996                let default_constraint = common_sql::convert::deserialize_default_constraint(
997                    set_default.default_constraint.as_slice(),
998                    &meta.column_schema.name,
999                    &meta.column_schema.data_type,
1000                )
1001                .context(SqlCommonSnafu)?;
1002
1003                meta.column_schema = meta
1004                    .column_schema
1005                    .clone()
1006                    .with_default_constraint(default_constraint)
1007                    .with_context(|_| CastDefaultValueSnafu {
1008                        reason: format!("Failed to set default : {set_default:?}"),
1009                    })?;
1010            } else {
1011                return InvalidRegionRequestSnafu {
1012                    region_id: self.region_id,
1013                    err: format!("column {} not found", set_default.name),
1014                }
1015                .fail();
1016            }
1017        }
1018        Ok(())
1019    }
1020}
1021
1022/// Fields skipped in serialization.
1023struct SkippedFields {
1024    /// Last schema.
1025    schema: SchemaRef,
1026    /// Id of the time index column.
1027    time_index: ColumnId,
1028    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
1029    id_to_index: HashMap<ColumnId, usize>,
1030}
1031
1032impl SkippedFields {
1033    /// Constructs skipped fields from `column_metadatas`.
1034    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
1035        let column_schemas = column_metadatas
1036            .iter()
1037            .map(|column_metadata| column_metadata.column_schema.clone())
1038            .collect();
1039        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1040        let time_index = column_metadatas
1041            .iter()
1042            .find_map(|col| {
1043                if col.semantic_type == SemanticType::Timestamp {
1044                    Some(col.column_id)
1045                } else {
1046                    None
1047                }
1048            })
1049            .context(InvalidMetaSnafu {
1050                reason: "time index not found",
1051            })?;
1052        let id_to_index = column_metadatas
1053            .iter()
1054            .enumerate()
1055            .map(|(idx, col)| (col.column_id, idx))
1056            .collect();
1057
1058        Ok(SkippedFields {
1059            schema,
1060            time_index,
1061            id_to_index,
1062        })
1063    }
1064}
1065
1066#[derive(Snafu)]
1067#[snafu(visibility(pub))]
1068#[stack_trace_debug]
1069pub enum MetadataError {
1070    #[snafu(display("Invalid schema"))]
1071    InvalidSchema {
1072        source: datatypes::error::Error,
1073        #[snafu(implicit)]
1074        location: Location,
1075    },
1076
1077    #[snafu(display("Invalid metadata, {}", reason))]
1078    InvalidMeta {
1079        reason: String,
1080        #[snafu(implicit)]
1081        location: Location,
1082    },
1083
1084    #[snafu(display("Failed to ser/de json object"))]
1085    SerdeJson {
1086        #[snafu(implicit)]
1087        location: Location,
1088        #[snafu(source)]
1089        error: serde_json::Error,
1090    },
1091
1092    #[snafu(display("Invalid raw region request, err: {}", err))]
1093    InvalidRawRegionRequest {
1094        err: String,
1095        #[snafu(implicit)]
1096        location: Location,
1097    },
1098
1099    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1100    InvalidRegionRequest {
1101        region_id: RegionId,
1102        err: String,
1103        #[snafu(implicit)]
1104        location: Location,
1105    },
1106
1107    #[snafu(display("Unexpected schema error during project"))]
1108    SchemaProject {
1109        origin_schema: SchemaRef,
1110        projection: Vec<ColumnId>,
1111        #[snafu(implicit)]
1112        location: Location,
1113        source: datatypes::Error,
1114    },
1115
1116    #[snafu(display("Time index column not found"))]
1117    TimeIndexNotFound {
1118        #[snafu(implicit)]
1119        location: Location,
1120    },
1121
1122    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1123    ChangeColumnNotFound {
1124        column_name: String,
1125        region_id: RegionId,
1126        #[snafu(implicit)]
1127        location: Location,
1128    },
1129
1130    #[snafu(display("Failed to convert column schema"))]
1131    ConvertColumnSchema {
1132        source: api::error::Error,
1133        #[snafu(implicit)]
1134        location: Location,
1135    },
1136
1137    #[snafu(display("Failed to convert TimeRanges"))]
1138    ConvertTimeRanges {
1139        source: api::error::Error,
1140        #[snafu(implicit)]
1141        location: Location,
1142    },
1143
1144    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1145    InvalidSetRegionOptionRequest {
1146        key: String,
1147        value: String,
1148        #[snafu(implicit)]
1149        location: Location,
1150    },
1151
1152    #[snafu(display("Invalid set region option request, key: {}", key))]
1153    InvalidUnsetRegionOptionRequest {
1154        key: String,
1155        #[snafu(implicit)]
1156        location: Location,
1157    },
1158
1159    #[snafu(display("Failed to decode protobuf"))]
1160    DecodeProto {
1161        #[snafu(source)]
1162        error: prost::UnknownEnumValue,
1163        #[snafu(implicit)]
1164        location: Location,
1165    },
1166
1167    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1168    InvalidColumnOption {
1169        column_name: String,
1170        msg: String,
1171        #[snafu(implicit)]
1172        location: Location,
1173    },
1174
1175    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1176    SetFulltextOptions {
1177        column_name: String,
1178        source: datatypes::Error,
1179        #[snafu(implicit)]
1180        location: Location,
1181    },
1182
1183    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1184    GetFulltextOptions {
1185        column_name: String,
1186        source: datatypes::Error,
1187        #[snafu(implicit)]
1188        location: Location,
1189    },
1190
1191    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1192    SetSkippingIndexOptions {
1193        column_name: String,
1194        source: datatypes::Error,
1195        #[snafu(implicit)]
1196        location: Location,
1197    },
1198
1199    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1200    UnsetSkippingIndexOptions {
1201        column_name: String,
1202        source: datatypes::Error,
1203        #[snafu(implicit)]
1204        location: Location,
1205    },
1206
1207    #[snafu(display("Failed to decode arrow ipc record batches"))]
1208    DecodeArrowIpc {
1209        #[snafu(source)]
1210        error: arrow::error::ArrowError,
1211        #[snafu(implicit)]
1212        location: Location,
1213    },
1214
1215    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1216    CastDefaultValue {
1217        reason: String,
1218        source: datatypes::Error,
1219        #[snafu(implicit)]
1220        location: Location,
1221    },
1222
1223    #[snafu(display("Unexpected: {}", reason))]
1224    Unexpected {
1225        reason: String,
1226        #[snafu(implicit)]
1227        location: Location,
1228    },
1229
1230    #[snafu(display("Failed to encode/decode flight message"))]
1231    FlightCodec {
1232        source: common_grpc::Error,
1233        #[snafu(implicit)]
1234        location: Location,
1235    },
1236
1237    #[snafu(display("Invalid index option"))]
1238    InvalidIndexOption {
1239        #[snafu(implicit)]
1240        location: Location,
1241        #[snafu(source)]
1242        error: datatypes::error::Error,
1243    },
1244
1245    #[snafu(display("Sql common error"))]
1246    SqlCommon {
1247        source: common_sql::error::Error,
1248        #[snafu(implicit)]
1249        location: Location,
1250    },
1251}
1252
1253impl ErrorExt for MetadataError {
1254    fn status_code(&self) -> StatusCode {
1255        match self {
1256            Self::SqlCommon { source, .. } => source.status_code(),
1257            _ => StatusCode::InvalidArguments,
1258        }
1259    }
1260
1261    fn as_any(&self) -> &dyn Any {
1262        self
1263    }
1264}
1265
1266/// Set column fulltext options if it passed the validation.
1267///
1268/// Options allowed to modify:
1269/// * backend
1270///
1271/// Options not allowed to modify:
1272/// * analyzer
1273/// * case_sensitive
1274fn set_column_fulltext_options(
1275    column_meta: &mut ColumnMetadata,
1276    column_name: &str,
1277    options: &FulltextOptions,
1278    current_options: Option<FulltextOptions>,
1279) -> Result<()> {
1280    if let Some(current_options) = current_options {
1281        ensure!(
1282            current_options.analyzer == options.analyzer
1283                && current_options.case_sensitive == options.case_sensitive,
1284            InvalidColumnOptionSnafu {
1285                column_name,
1286                msg: format!(
1287                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1288                    current_options.analyzer, current_options.case_sensitive
1289                ),
1290            }
1291        );
1292    }
1293
1294    column_meta
1295        .column_schema
1296        .set_fulltext_options(options)
1297        .context(SetFulltextOptionsSnafu { column_name })?;
1298
1299    Ok(())
1300}
1301
1302fn unset_column_fulltext_options(
1303    column_meta: &mut ColumnMetadata,
1304    column_name: &str,
1305    current_options: Option<FulltextOptions>,
1306) -> Result<()> {
1307    if let Some(mut current_options) = current_options
1308        && current_options.enable
1309    {
1310        current_options.enable = false;
1311        column_meta
1312            .column_schema
1313            .set_fulltext_options(&current_options)
1314            .context(SetFulltextOptionsSnafu { column_name })?;
1315    } else {
1316        return InvalidColumnOptionSnafu {
1317            column_name,
1318            msg: "FULLTEXT index already disabled",
1319        }
1320        .fail();
1321    }
1322
1323    Ok(())
1324}
1325
1326#[cfg(test)]
1327mod test {
1328    use datatypes::prelude::ConcreteDataType;
1329    use datatypes::schema::{
1330        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1331    };
1332    use datatypes::value::Value;
1333
1334    use super::*;
1335
1336    fn create_builder() -> RegionMetadataBuilder {
1337        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1338    }
1339
1340    fn build_test_region_metadata() -> RegionMetadata {
1341        let mut builder = create_builder();
1342        builder
1343            .push_column_metadata(ColumnMetadata {
1344                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1345                semantic_type: SemanticType::Tag,
1346                column_id: 1,
1347            })
1348            .push_column_metadata(ColumnMetadata {
1349                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1350                semantic_type: SemanticType::Field,
1351                column_id: 2,
1352            })
1353            .push_column_metadata(ColumnMetadata {
1354                column_schema: ColumnSchema::new(
1355                    "c",
1356                    ConcreteDataType::timestamp_millisecond_datatype(),
1357                    false,
1358                ),
1359                semantic_type: SemanticType::Timestamp,
1360                column_id: 3,
1361            })
1362            .primary_key(vec![1])
1363            .partition_expr_json(Some("".to_string()));
1364        builder.build().unwrap()
1365    }
1366
1367    #[test]
1368    fn test_region_metadata() {
1369        let region_metadata = build_test_region_metadata();
1370        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1371        assert_eq!(
1372            "a",
1373            region_metadata.column_by_id(1).unwrap().column_schema.name
1374        );
1375        assert_eq!(None, region_metadata.column_by_id(10));
1376    }
1377
1378    #[test]
1379    fn test_region_metadata_serde() {
1380        let region_metadata = build_test_region_metadata();
1381        let serialized = serde_json::to_string(&region_metadata).unwrap();
1382        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1383        assert_eq!(region_metadata, deserialized);
1384    }
1385
1386    #[test]
1387    fn test_column_metadata_validate() {
1388        let mut builder = create_builder();
1389        let col = ColumnMetadata {
1390            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1391            semantic_type: SemanticType::Timestamp,
1392            column_id: 1,
1393        };
1394
1395        builder.push_column_metadata(col);
1396        let err = builder.build().unwrap_err();
1397        assert!(
1398            err.to_string()
1399                .contains("column `ts` is not timestamp type"),
1400            "unexpected err: {err}",
1401        );
1402    }
1403
1404    #[test]
1405    fn test_empty_region_metadata() {
1406        let builder = create_builder();
1407        let err = builder.build().unwrap_err();
1408        // A region must have a time index.
1409        assert!(
1410            err.to_string().contains("time index not found"),
1411            "unexpected err: {err}",
1412        );
1413    }
1414
1415    #[test]
1416    fn test_same_column_id() {
1417        let mut builder = create_builder();
1418        builder
1419            .push_column_metadata(ColumnMetadata {
1420                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1421                semantic_type: SemanticType::Tag,
1422                column_id: 1,
1423            })
1424            .push_column_metadata(ColumnMetadata {
1425                column_schema: ColumnSchema::new(
1426                    "b",
1427                    ConcreteDataType::timestamp_millisecond_datatype(),
1428                    false,
1429                ),
1430                semantic_type: SemanticType::Timestamp,
1431                column_id: 1,
1432            });
1433        let err = builder.build().unwrap_err();
1434        assert!(
1435            err.to_string()
1436                .contains("column a and b have the same column id"),
1437            "unexpected err: {err}",
1438        );
1439    }
1440
1441    #[test]
1442    fn test_duplicate_time_index() {
1443        let mut builder = create_builder();
1444        builder
1445            .push_column_metadata(ColumnMetadata {
1446                column_schema: ColumnSchema::new(
1447                    "a",
1448                    ConcreteDataType::timestamp_millisecond_datatype(),
1449                    false,
1450                ),
1451                semantic_type: SemanticType::Timestamp,
1452                column_id: 1,
1453            })
1454            .push_column_metadata(ColumnMetadata {
1455                column_schema: ColumnSchema::new(
1456                    "b",
1457                    ConcreteDataType::timestamp_millisecond_datatype(),
1458                    false,
1459                ),
1460                semantic_type: SemanticType::Timestamp,
1461                column_id: 2,
1462            });
1463        let err = builder.build().unwrap_err();
1464        assert!(
1465            err.to_string().contains("expect only one time index"),
1466            "unexpected err: {err}",
1467        );
1468    }
1469
1470    #[test]
1471    fn test_unknown_primary_key() {
1472        let mut builder = create_builder();
1473        builder
1474            .push_column_metadata(ColumnMetadata {
1475                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1476                semantic_type: SemanticType::Tag,
1477                column_id: 1,
1478            })
1479            .push_column_metadata(ColumnMetadata {
1480                column_schema: ColumnSchema::new(
1481                    "b",
1482                    ConcreteDataType::timestamp_millisecond_datatype(),
1483                    false,
1484                ),
1485                semantic_type: SemanticType::Timestamp,
1486                column_id: 2,
1487            })
1488            .primary_key(vec![3]);
1489        let err = builder.build().unwrap_err();
1490        assert!(
1491            err.to_string().contains("unknown column id 3"),
1492            "unexpected err: {err}",
1493        );
1494    }
1495
1496    #[test]
1497    fn test_same_primary_key() {
1498        let mut builder = create_builder();
1499        builder
1500            .push_column_metadata(ColumnMetadata {
1501                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1502                semantic_type: SemanticType::Tag,
1503                column_id: 1,
1504            })
1505            .push_column_metadata(ColumnMetadata {
1506                column_schema: ColumnSchema::new(
1507                    "b",
1508                    ConcreteDataType::timestamp_millisecond_datatype(),
1509                    false,
1510                ),
1511                semantic_type: SemanticType::Timestamp,
1512                column_id: 2,
1513            })
1514            .primary_key(vec![1, 1]);
1515        let err = builder.build().unwrap_err();
1516        assert!(
1517            err.to_string()
1518                .contains("duplicate column a in primary key"),
1519            "unexpected err: {err}",
1520        );
1521    }
1522
1523    #[test]
1524    fn test_in_time_index() {
1525        let mut builder = create_builder();
1526        builder
1527            .push_column_metadata(ColumnMetadata {
1528                column_schema: ColumnSchema::new(
1529                    "ts",
1530                    ConcreteDataType::timestamp_millisecond_datatype(),
1531                    false,
1532                ),
1533                semantic_type: SemanticType::Timestamp,
1534                column_id: 1,
1535            })
1536            .primary_key(vec![1]);
1537        let err = builder.build().unwrap_err();
1538        assert!(
1539            err.to_string()
1540                .contains("column ts is already a time index column"),
1541            "unexpected err: {err}",
1542        );
1543    }
1544
1545    #[test]
1546    fn test_nullable_time_index() {
1547        let mut builder = create_builder();
1548        builder.push_column_metadata(ColumnMetadata {
1549            column_schema: ColumnSchema::new(
1550                "ts",
1551                ConcreteDataType::timestamp_millisecond_datatype(),
1552                true,
1553            ),
1554            semantic_type: SemanticType::Timestamp,
1555            column_id: 1,
1556        });
1557        let err = builder.build().unwrap_err();
1558        assert!(
1559            err.to_string()
1560                .contains("time index column ts must be NOT NULL"),
1561            "unexpected err: {err}",
1562        );
1563    }
1564
1565    #[test]
1566    fn test_primary_key_semantic_type() {
1567        let mut builder = create_builder();
1568        builder
1569            .push_column_metadata(ColumnMetadata {
1570                column_schema: ColumnSchema::new(
1571                    "ts",
1572                    ConcreteDataType::timestamp_millisecond_datatype(),
1573                    false,
1574                ),
1575                semantic_type: SemanticType::Timestamp,
1576                column_id: 1,
1577            })
1578            .push_column_metadata(ColumnMetadata {
1579                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1580                semantic_type: SemanticType::Field,
1581                column_id: 2,
1582            })
1583            .primary_key(vec![2]);
1584        let err = builder.build().unwrap_err();
1585        assert!(
1586            err.to_string()
1587                .contains("semantic type of column a should be Tag, not Field"),
1588            "unexpected err: {err}",
1589        );
1590    }
1591
1592    #[test]
1593    fn test_primary_key_tag_num() {
1594        let mut builder = create_builder();
1595        builder
1596            .push_column_metadata(ColumnMetadata {
1597                column_schema: ColumnSchema::new(
1598                    "ts",
1599                    ConcreteDataType::timestamp_millisecond_datatype(),
1600                    false,
1601                ),
1602                semantic_type: SemanticType::Timestamp,
1603                column_id: 1,
1604            })
1605            .push_column_metadata(ColumnMetadata {
1606                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1607                semantic_type: SemanticType::Tag,
1608                column_id: 2,
1609            })
1610            .push_column_metadata(ColumnMetadata {
1611                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1612                semantic_type: SemanticType::Tag,
1613                column_id: 3,
1614            })
1615            .primary_key(vec![2]);
1616        let err = builder.build().unwrap_err();
1617        assert!(
1618            err.to_string()
1619                .contains("number of primary key columns 1 not equal to tag columns 2"),
1620            "unexpected err: {err}",
1621        );
1622    }
1623
1624    #[test]
1625    fn test_bump_version() {
1626        let mut region_metadata = build_test_region_metadata();
1627        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1628        builder.bump_version();
1629        let new_meta = builder.build().unwrap();
1630        region_metadata.schema_version += 1;
1631        assert_eq!(region_metadata, new_meta);
1632    }
1633
1634    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1635        let semantic_type = if is_tag {
1636            SemanticType::Tag
1637        } else {
1638            SemanticType::Field
1639        };
1640        ColumnMetadata {
1641            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1642            semantic_type,
1643            column_id,
1644        }
1645    }
1646
1647    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1648        let actual: Vec<_> = metadata
1649            .column_metadatas
1650            .iter()
1651            .map(|col| &col.column_schema.name)
1652            .collect();
1653        assert_eq!(names, actual);
1654    }
1655
1656    fn get_columns_default_constraint(
1657        metadata: &RegionMetadata,
1658        name: String,
1659    ) -> Option<Option<&ColumnDefaultConstraint>> {
1660        metadata.column_metadatas.iter().find_map(|col| {
1661            if col.column_schema.name == name {
1662                Some(col.column_schema.default_constraint())
1663            } else {
1664                None
1665            }
1666        })
1667    }
1668
1669    #[test]
1670    fn test_alter() {
1671        // a (tag), b (field), c (ts)
1672        let metadata = build_test_region_metadata();
1673        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1674        // tag d
1675        builder
1676            .alter(AlterKind::AddColumns {
1677                columns: vec![AddColumn {
1678                    column_metadata: new_column_metadata("d", true, 4),
1679                    location: None,
1680                }],
1681            })
1682            .unwrap();
1683        let metadata = builder.build().unwrap();
1684        check_columns(&metadata, &["a", "b", "c", "d"]);
1685        assert_eq!([1, 4], &metadata.primary_key[..]);
1686
1687        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1688        builder
1689            .alter(AlterKind::AddColumns {
1690                columns: vec![AddColumn {
1691                    column_metadata: new_column_metadata("e", false, 5),
1692                    location: Some(AddColumnLocation::First),
1693                }],
1694            })
1695            .unwrap();
1696        let metadata = builder.build().unwrap();
1697        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1698
1699        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1700        builder
1701            .alter(AlterKind::AddColumns {
1702                columns: vec![AddColumn {
1703                    column_metadata: new_column_metadata("f", false, 6),
1704                    location: Some(AddColumnLocation::After {
1705                        column_name: "b".to_string(),
1706                    }),
1707                }],
1708            })
1709            .unwrap();
1710        let metadata = builder.build().unwrap();
1711        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1712
1713        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1714        builder
1715            .alter(AlterKind::AddColumns {
1716                columns: vec![AddColumn {
1717                    column_metadata: new_column_metadata("g", false, 7),
1718                    location: Some(AddColumnLocation::After {
1719                        column_name: "d".to_string(),
1720                    }),
1721                }],
1722            })
1723            .unwrap();
1724        let metadata = builder.build().unwrap();
1725        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1726
1727        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1728        builder
1729            .alter(AlterKind::DropColumns {
1730                names: vec!["g".to_string(), "e".to_string()],
1731            })
1732            .unwrap();
1733        let metadata = builder.build().unwrap();
1734        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1735
1736        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1737        builder
1738            .alter(AlterKind::DropColumns {
1739                names: vec!["a".to_string()],
1740            })
1741            .unwrap();
1742        // Build returns error as the primary key contains a.
1743        let err = builder.build().unwrap_err();
1744        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1745
1746        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1747        let mut column_metadata = new_column_metadata("g", false, 8);
1748        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1749        column_metadata.column_schema = column_metadata
1750            .column_schema
1751            .with_default_constraint(default_constraint.clone())
1752            .unwrap();
1753        builder
1754            .alter(AlterKind::AddColumns {
1755                columns: vec![AddColumn {
1756                    column_metadata,
1757                    location: None,
1758                }],
1759            })
1760            .unwrap();
1761        let metadata = builder.build().unwrap();
1762        assert_eq!(
1763            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1764            default_constraint.as_ref()
1765        );
1766        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1767
1768        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1769        builder
1770            .alter(AlterKind::DropDefaults {
1771                names: vec!["g".to_string()],
1772            })
1773            .unwrap();
1774        let metadata = builder.build().unwrap();
1775        assert_eq!(
1776            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1777            None
1778        );
1779        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1780
1781        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1782        builder
1783            .alter(AlterKind::DropColumns {
1784                names: vec!["g".to_string()],
1785            })
1786            .unwrap();
1787        let metadata = builder.build().unwrap();
1788        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1789
1790        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1791        builder
1792            .alter(AlterKind::ModifyColumnTypes {
1793                columns: vec![ModifyColumnType {
1794                    column_name: "b".to_string(),
1795                    target_type: ConcreteDataType::string_datatype(),
1796                }],
1797            })
1798            .unwrap();
1799        let metadata = builder.build().unwrap();
1800        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1801        let b_type = &metadata
1802            .column_by_name("b")
1803            .unwrap()
1804            .column_schema
1805            .data_type;
1806        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1807
1808        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1809        builder
1810            .alter(AlterKind::SetIndexes {
1811                options: vec![SetIndexOption::Fulltext {
1812                    column_name: "b".to_string(),
1813                    options: FulltextOptions::new_unchecked(
1814                        true,
1815                        FulltextAnalyzer::Chinese,
1816                        true,
1817                        FulltextBackend::Bloom,
1818                        1000,
1819                        0.01,
1820                    ),
1821                }],
1822            })
1823            .unwrap();
1824        let metadata = builder.build().unwrap();
1825        let a_fulltext_options = metadata
1826            .column_by_name("b")
1827            .unwrap()
1828            .column_schema
1829            .fulltext_options()
1830            .unwrap()
1831            .unwrap();
1832        assert!(a_fulltext_options.enable);
1833        assert_eq!(
1834            datatypes::schema::FulltextAnalyzer::Chinese,
1835            a_fulltext_options.analyzer
1836        );
1837        assert!(a_fulltext_options.case_sensitive);
1838
1839        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1840        builder
1841            .alter(AlterKind::UnsetIndexes {
1842                options: vec![UnsetIndexOption::Fulltext {
1843                    column_name: "b".to_string(),
1844                }],
1845            })
1846            .unwrap();
1847        let metadata = builder.build().unwrap();
1848        let a_fulltext_options = metadata
1849            .column_by_name("b")
1850            .unwrap()
1851            .column_schema
1852            .fulltext_options()
1853            .unwrap()
1854            .unwrap();
1855        assert!(!a_fulltext_options.enable);
1856        assert_eq!(
1857            datatypes::schema::FulltextAnalyzer::Chinese,
1858            a_fulltext_options.analyzer
1859        );
1860        assert!(a_fulltext_options.case_sensitive);
1861    }
1862
1863    #[test]
1864    fn test_add_if_not_exists() {
1865        // a (tag), b (field), c (ts)
1866        let metadata = build_test_region_metadata();
1867        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1868        // tag d
1869        builder
1870            .alter(AlterKind::AddColumns {
1871                columns: vec![
1872                    AddColumn {
1873                        column_metadata: new_column_metadata("d", true, 4),
1874                        location: None,
1875                    },
1876                    AddColumn {
1877                        column_metadata: new_column_metadata("d", true, 4),
1878                        location: None,
1879                    },
1880                ],
1881            })
1882            .unwrap();
1883        let metadata = builder.build().unwrap();
1884        check_columns(&metadata, &["a", "b", "c", "d"]);
1885        assert_eq!([1, 4], &metadata.primary_key[..]);
1886
1887        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1888        // field b.
1889        builder
1890            .alter(AlterKind::AddColumns {
1891                columns: vec![AddColumn {
1892                    column_metadata: new_column_metadata("b", false, 2),
1893                    location: None,
1894                }],
1895            })
1896            .unwrap();
1897        let metadata = builder.build().unwrap();
1898        check_columns(&metadata, &["a", "b", "c", "d"]);
1899    }
1900
1901    #[test]
1902    fn test_add_column_with_inverted_index() {
1903        // only set inverted index to true explicitly will this column be inverted indexed
1904
1905        // a (tag), b (field), c (ts)
1906        let metadata = build_test_region_metadata();
1907        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1908        // tag d, e
1909        let mut col = new_column_metadata("d", true, 4);
1910        col.column_schema.set_inverted_index(true);
1911        builder
1912            .alter(AlterKind::AddColumns {
1913                columns: vec![
1914                    AddColumn {
1915                        column_metadata: col,
1916                        location: None,
1917                    },
1918                    AddColumn {
1919                        column_metadata: new_column_metadata("e", true, 5),
1920                        location: None,
1921                    },
1922                ],
1923            })
1924            .unwrap();
1925        let metadata = builder.build().unwrap();
1926        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1927        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1928        let column_metadata = metadata.column_by_name("a").unwrap();
1929        assert!(!column_metadata.column_schema.is_inverted_indexed());
1930        let column_metadata = metadata.column_by_name("b").unwrap();
1931        assert!(!column_metadata.column_schema.is_inverted_indexed());
1932        let column_metadata = metadata.column_by_name("c").unwrap();
1933        assert!(!column_metadata.column_schema.is_inverted_indexed());
1934        let column_metadata = metadata.column_by_name("d").unwrap();
1935        assert!(column_metadata.column_schema.is_inverted_indexed());
1936        let column_metadata = metadata.column_by_name("e").unwrap();
1937        assert!(!column_metadata.column_schema.is_inverted_indexed());
1938    }
1939
1940    #[test]
1941    fn test_drop_if_exists() {
1942        // a (tag), b (field), c (ts)
1943        let metadata = build_test_region_metadata();
1944        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1945        // field d, e
1946        builder
1947            .alter(AlterKind::AddColumns {
1948                columns: vec![
1949                    AddColumn {
1950                        column_metadata: new_column_metadata("d", false, 4),
1951                        location: None,
1952                    },
1953                    AddColumn {
1954                        column_metadata: new_column_metadata("e", false, 5),
1955                        location: None,
1956                    },
1957                ],
1958            })
1959            .unwrap();
1960        let metadata = builder.build().unwrap();
1961        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1962
1963        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1964        builder
1965            .alter(AlterKind::DropColumns {
1966                names: vec!["b".to_string(), "b".to_string()],
1967            })
1968            .unwrap();
1969        let metadata = builder.build().unwrap();
1970        check_columns(&metadata, &["a", "c", "d", "e"]);
1971
1972        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1973        builder
1974            .alter(AlterKind::DropColumns {
1975                names: vec!["b".to_string(), "e".to_string()],
1976            })
1977            .unwrap();
1978        let metadata = builder.build().unwrap();
1979        check_columns(&metadata, &["a", "c", "d"]);
1980    }
1981
1982    #[test]
1983    fn test_invalid_column_name() {
1984        let mut builder = create_builder();
1985        builder.push_column_metadata(ColumnMetadata {
1986            column_schema: ColumnSchema::new(
1987                "__sequence",
1988                ConcreteDataType::timestamp_millisecond_datatype(),
1989                false,
1990            ),
1991            semantic_type: SemanticType::Timestamp,
1992            column_id: 1,
1993        });
1994        let err = builder.build().unwrap_err();
1995        assert!(
1996            err.to_string()
1997                .contains("internal column name that can not be used"),
1998            "unexpected err: {err}",
1999        );
2000    }
2001
2002    #[test]
2003    fn test_allow_internal_column_name() {
2004        let mut builder = create_builder();
2005        builder
2006            .push_column_metadata(ColumnMetadata {
2007                column_schema: ColumnSchema::new(
2008                    "__primary_key",
2009                    ConcreteDataType::string_datatype(),
2010                    false,
2011                ),
2012                semantic_type: SemanticType::Tag,
2013                column_id: 1,
2014            })
2015            .push_column_metadata(ColumnMetadata {
2016                column_schema: ColumnSchema::new(
2017                    "ts",
2018                    ConcreteDataType::timestamp_millisecond_datatype(),
2019                    false,
2020                ),
2021                semantic_type: SemanticType::Timestamp,
2022                column_id: 2,
2023            })
2024            .primary_key(vec![1]);
2025
2026        let metadata = builder.build_without_validation().unwrap();
2027        assert_eq!(
2028            "__primary_key",
2029            metadata.column_metadatas[0].column_schema.name
2030        );
2031    }
2032
2033    #[test]
2034    fn test_build_without_validation() {
2035        // Primary key points to a Field column, which would normally fail validation.
2036        let mut builder = create_builder();
2037        builder
2038            .push_column_metadata(ColumnMetadata {
2039                column_schema: ColumnSchema::new(
2040                    "ts",
2041                    ConcreteDataType::timestamp_millisecond_datatype(),
2042                    false,
2043                ),
2044                semantic_type: SemanticType::Timestamp,
2045                column_id: 1,
2046            })
2047            .push_column_metadata(ColumnMetadata {
2048                column_schema: ColumnSchema::new(
2049                    "field",
2050                    ConcreteDataType::string_datatype(),
2051                    true,
2052                ),
2053                semantic_type: SemanticType::Field,
2054                column_id: 2,
2055            })
2056            .primary_key(vec![2]);
2057
2058        // Unvalidated build should succeed.
2059        let metadata = builder.build_without_validation().unwrap();
2060        assert_eq!(vec![2], metadata.primary_key);
2061
2062        // Validated build still rejects it.
2063        let mut builder = create_builder();
2064        builder
2065            .push_column_metadata(ColumnMetadata {
2066                column_schema: ColumnSchema::new(
2067                    "ts",
2068                    ConcreteDataType::timestamp_millisecond_datatype(),
2069                    false,
2070                ),
2071                semantic_type: SemanticType::Timestamp,
2072                column_id: 1,
2073            })
2074            .push_column_metadata(ColumnMetadata {
2075                column_schema: ColumnSchema::new(
2076                    "field",
2077                    ConcreteDataType::string_datatype(),
2078                    true,
2079                ),
2080                semantic_type: SemanticType::Field,
2081                column_id: 2,
2082            })
2083            .primary_key(vec![2]);
2084        let err = builder.build().unwrap_err();
2085        assert!(
2086            err.to_string()
2087                .contains("semantic type of column field should be Tag"),
2088            "unexpected err: {err}"
2089        );
2090    }
2091
2092    #[test]
2093    fn test_debug_for_column_metadata() {
2094        let region_metadata = build_test_region_metadata();
2095        let formatted = format!("{:?}", region_metadata);
2096        assert_eq!(
2097            formatted,
2098            "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2099        );
2100    }
2101
2102    #[test]
2103    fn test_region_metadata_deserialize_default_primary_key_encoding() {
2104        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2105        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2106        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2107
2108        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2109        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2110        assert_eq!(
2111            deserialized.primary_key_encoding,
2112            PrimaryKeyEncoding::Sparse
2113        );
2114    }
2115}