store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
33use datatypes::types::TimestampType;
34use serde::de::Error;
35use serde::{Deserialize, Deserializer, Serialize};
36use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
37
38use crate::codec::PrimaryKeyEncoding;
39use crate::region_request::{
40    AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, ApiUnsetIndexOptions,
41    ModifyColumnType,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157}
158
159impl fmt::Debug for RegionMetadata {
160    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
161        f.debug_struct("RegionMetadata")
162            .field("column_metadatas", &self.column_metadatas)
163            .field("time_index", &self.time_index)
164            .field("primary_key", &self.primary_key)
165            .field("region_id", &self.region_id)
166            .field("schema_version", &self.schema_version)
167            .finish()
168    }
169}
170
171pub type RegionMetadataRef = Arc<RegionMetadata>;
172
173impl<'de> Deserialize<'de> for RegionMetadata {
174    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
175    where
176        D: Deserializer<'de>,
177    {
178        // helper internal struct for deserialization
179        #[derive(Deserialize)]
180        struct RegionMetadataWithoutSchema {
181            column_metadatas: Vec<ColumnMetadata>,
182            primary_key: Vec<ColumnId>,
183            region_id: RegionId,
184            schema_version: u64,
185            #[serde(default)]
186            primary_key_encoding: PrimaryKeyEncoding,
187        }
188
189        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
190        let skipped =
191            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
192
193        Ok(Self {
194            schema: skipped.schema,
195            time_index: skipped.time_index,
196            id_to_index: skipped.id_to_index,
197            column_metadatas: without_schema.column_metadatas,
198            primary_key: without_schema.primary_key,
199            region_id: without_schema.region_id,
200            schema_version: without_schema.schema_version,
201            primary_key_encoding: without_schema.primary_key_encoding,
202        })
203    }
204}
205
206impl RegionMetadata {
207    /// Decode the metadata from a JSON str.
208    pub fn from_json(s: &str) -> Result<Self> {
209        serde_json::from_str(s).context(SerdeJsonSnafu)
210    }
211
212    /// Encode the metadata to a JSON string.
213    pub fn to_json(&self) -> Result<String> {
214        serde_json::to_string(&self).context(SerdeJsonSnafu)
215    }
216
217    /// Find column by id.
218    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
219        self.id_to_index
220            .get(&column_id)
221            .map(|index| &self.column_metadatas[*index])
222    }
223
224    /// Find column index by id.
225    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
226        self.id_to_index.get(&column_id).copied()
227    }
228
229    /// Find column index by name.
230    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
231        self.column_metadatas
232            .iter()
233            .position(|col| col.column_schema.name == column_name)
234    }
235
236    /// Returns the time index column
237    ///
238    /// # Panics
239    /// Panics if the time index column id is invalid.
240    pub fn time_index_column(&self) -> &ColumnMetadata {
241        let index = self.id_to_index[&self.time_index];
242        &self.column_metadatas[index]
243    }
244
245    /// Returns timestamp type of time index column
246    ///
247    /// # Panics
248    /// Panics if the time index column id is invalid.
249    pub fn time_index_type(&self) -> TimestampType {
250        let index = self.id_to_index[&self.time_index];
251        self.column_metadatas[index]
252            .column_schema
253            .data_type
254            .as_timestamp()
255            .unwrap()
256    }
257
258    /// Returns the position of the time index.
259    pub fn time_index_column_pos(&self) -> usize {
260        self.id_to_index[&self.time_index]
261    }
262
263    /// Returns the arrow field of the time index column.
264    pub fn time_index_field(&self) -> FieldRef {
265        let index = self.id_to_index[&self.time_index];
266        self.schema.arrow_schema().fields[index].clone()
267    }
268
269    /// Finds a column by name.
270    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
271        self.schema
272            .column_index_by_name(name)
273            .map(|index| &self.column_metadatas[index])
274    }
275
276    /// Returns all primary key columns.
277    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
278        // safety: RegionMetadata::validate ensures every primary key exists.
279        self.primary_key
280            .iter()
281            .map(|id| self.column_by_id(*id).unwrap())
282    }
283
284    /// Returns all field columns before projection.
285    ///
286    /// **Use with caution**. On read path where might have projection, this method
287    /// can return columns that not present in data batch.
288    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
289        self.column_metadatas
290            .iter()
291            .filter(|column| column.semantic_type == SemanticType::Field)
292    }
293
294    /// Returns a column's index in primary key if it is a primary key column.
295    ///
296    /// This does a linear search.
297    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
298        self.primary_key.iter().position(|id| *id == column_id)
299    }
300
301    /// Project the metadata to a new one using specified column ids.
302    ///
303    /// [RegionId] and schema version are preserved.
304    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
305        // check time index
306        ensure!(
307            projection.contains(&self.time_index),
308            TimeIndexNotFoundSnafu
309        );
310
311        // prepare new indices
312        let indices_to_preserve = projection
313            .iter()
314            .map(|id| {
315                self.column_index_by_id(*id)
316                    .with_context(|| InvalidRegionRequestSnafu {
317                        region_id: self.region_id,
318                        err: format!("column id {} not found", id),
319                    })
320            })
321            .collect::<Result<Vec<_>>>()?;
322
323        // project schema
324        let projected_schema =
325            self.schema
326                .try_project(&indices_to_preserve)
327                .with_context(|_| SchemaProjectSnafu {
328                    origin_schema: self.schema.clone(),
329                    projection: projection.to_vec(),
330                })?;
331
332        // project columns, generate projected primary key and new id_to_index
333        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
334        let mut projected_primary_key = vec![];
335        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
336        for index in indices_to_preserve {
337            let col = self.column_metadatas[index].clone();
338            if col.semantic_type == SemanticType::Tag {
339                projected_primary_key.push(col.column_id);
340            }
341            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
342            projected_column_metadatas.push(col);
343        }
344
345        Ok(RegionMetadata {
346            schema: Arc::new(projected_schema),
347            time_index: self.time_index,
348            id_to_index: projected_id_to_index,
349            column_metadatas: projected_column_metadatas,
350            primary_key: projected_primary_key,
351            region_id: self.region_id,
352            schema_version: self.schema_version,
353            primary_key_encoding: self.primary_key_encoding,
354        })
355    }
356
357    /// Gets the column ids to be indexed by inverted index.
358    pub fn inverted_indexed_column_ids<'a>(
359        &self,
360        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
361    ) -> HashSet<ColumnId> {
362        let mut inverted_index = self
363            .column_metadatas
364            .iter()
365            .filter(|column| column.column_schema.is_inverted_indexed())
366            .map(|column| column.column_id)
367            .collect::<HashSet<_>>();
368
369        for ignored in ignore_column_ids {
370            inverted_index.remove(ignored);
371        }
372
373        inverted_index
374    }
375
376    /// Checks whether the metadata is valid.
377    fn validate(&self) -> Result<()> {
378        // Id to name.
379        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
380        for col in &self.column_metadatas {
381            // Validate each column.
382            Self::validate_column_metadata(col)?;
383
384            // Check whether column id is duplicated. We already check column name
385            // is unique in `Schema` so we only check column id here.
386            ensure!(
387                !id_names.contains_key(&col.column_id),
388                InvalidMetaSnafu {
389                    reason: format!(
390                        "column {} and {} have the same column id {}",
391                        id_names[&col.column_id], col.column_schema.name, col.column_id,
392                    ),
393                }
394            );
395            id_names.insert(col.column_id, &col.column_schema.name);
396        }
397
398        // Checks there is only one time index.
399        let num_time_index = self
400            .column_metadatas
401            .iter()
402            .filter(|col| col.semantic_type == SemanticType::Timestamp)
403            .count();
404        ensure!(
405            num_time_index == 1,
406            InvalidMetaSnafu {
407                reason: format!("expect only one time index, found {}", num_time_index),
408            }
409        );
410
411        // Checks the time index column is not nullable.
412        ensure!(
413            !self.time_index_column().column_schema.is_nullable(),
414            InvalidMetaSnafu {
415                reason: format!(
416                    "time index column {} must be NOT NULL",
417                    self.time_index_column().column_schema.name
418                ),
419            }
420        );
421
422        if !self.primary_key.is_empty() {
423            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
424            // Checks column ids in the primary key is valid.
425            for column_id in &self.primary_key {
426                // Checks whether the column id exists.
427                ensure!(
428                    id_names.contains_key(column_id),
429                    InvalidMetaSnafu {
430                        reason: format!("unknown column id {}", column_id),
431                    }
432                );
433
434                // Safety: Column with specific id must exist.
435                let column = self.column_by_id(*column_id).unwrap();
436                // Checks duplicate.
437                ensure!(
438                    !pk_ids.contains(&column_id),
439                    InvalidMetaSnafu {
440                        reason: format!(
441                            "duplicate column {} in primary key",
442                            column.column_schema.name
443                        ),
444                    }
445                );
446
447                // Checks this is not a time index column.
448                ensure!(
449                    *column_id != self.time_index,
450                    InvalidMetaSnafu {
451                        reason: format!(
452                            "column {} is already a time index column",
453                            column.column_schema.name,
454                        ),
455                    }
456                );
457
458                // Checks semantic type.
459                ensure!(
460                    column.semantic_type == SemanticType::Tag,
461                    InvalidMetaSnafu {
462                        reason: format!(
463                            "semantic type of column {} should be Tag, not {:?}",
464                            column.column_schema.name, column.semantic_type
465                        ),
466                    }
467                );
468
469                pk_ids.insert(column_id);
470            }
471        }
472
473        // Checks tag semantic type.
474        let num_tag = self
475            .column_metadatas
476            .iter()
477            .filter(|col| col.semantic_type == SemanticType::Tag)
478            .count();
479        ensure!(
480            num_tag == self.primary_key.len(),
481            InvalidMetaSnafu {
482                reason: format!(
483                    "number of primary key columns {} not equal to tag columns {}",
484                    self.primary_key.len(),
485                    num_tag
486                ),
487            }
488        );
489
490        Ok(())
491    }
492
493    /// Checks whether it is a valid column.
494    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
495        if column_metadata.semantic_type == SemanticType::Timestamp {
496            ensure!(
497                column_metadata.column_schema.data_type.is_timestamp(),
498                InvalidMetaSnafu {
499                    reason: format!(
500                        "column `{}` is not timestamp type",
501                        column_metadata.column_schema.name
502                    ),
503                }
504            );
505        }
506
507        ensure!(
508            !is_internal_column(&column_metadata.column_schema.name),
509            InvalidMetaSnafu {
510                reason: format!(
511                    "{} is internal column name that can not be used",
512                    column_metadata.column_schema.name
513                ),
514            }
515        );
516
517        Ok(())
518    }
519}
520
521/// Builder to build [RegionMetadata].
522pub struct RegionMetadataBuilder {
523    region_id: RegionId,
524    column_metadatas: Vec<ColumnMetadata>,
525    primary_key: Vec<ColumnId>,
526    schema_version: u64,
527    primary_key_encoding: PrimaryKeyEncoding,
528}
529
530impl RegionMetadataBuilder {
531    /// Returns a new builder.
532    pub fn new(id: RegionId) -> Self {
533        Self {
534            region_id: id,
535            column_metadatas: vec![],
536            primary_key: vec![],
537            schema_version: 0,
538            primary_key_encoding: PrimaryKeyEncoding::Dense,
539        }
540    }
541
542    /// Creates a builder from existing [RegionMetadata].
543    pub fn from_existing(existing: RegionMetadata) -> Self {
544        Self {
545            column_metadatas: existing.column_metadatas,
546            primary_key: existing.primary_key,
547            region_id: existing.region_id,
548            schema_version: existing.schema_version,
549            primary_key_encoding: existing.primary_key_encoding,
550        }
551    }
552
553    /// Sets the primary key encoding mode.
554    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
555        self.primary_key_encoding = encoding;
556        self
557    }
558
559    /// Pushes a new column metadata to this region's metadata.
560    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
561        self.column_metadatas.push(column_metadata);
562        self
563    }
564
565    /// Sets the primary key of the region.
566    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
567        self.primary_key = key;
568        self
569    }
570
571    /// Increases the schema version by 1.
572    pub fn bump_version(&mut self) -> &mut Self {
573        self.schema_version += 1;
574        self
575    }
576
577    /// Applies the alter `kind` to the builder.
578    ///
579    /// The `kind` should be valid.
580    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
581        match kind {
582            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
583            AlterKind::DropColumns { names } => self.drop_columns(&names),
584            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
585            AlterKind::SetIndex { options } => match options {
586                ApiSetIndexOptions::Fulltext {
587                    column_name,
588                    options,
589                } => self.change_column_fulltext_options(column_name, true, Some(options))?,
590                ApiSetIndexOptions::Inverted { column_name } => {
591                    self.change_column_inverted_index_options(column_name, true)?
592                }
593                ApiSetIndexOptions::Skipping {
594                    column_name,
595                    options,
596                } => self.change_column_skipping_index_options(column_name, Some(options))?,
597            },
598            AlterKind::UnsetIndex { options } => match options {
599                ApiUnsetIndexOptions::Fulltext { column_name } => {
600                    self.change_column_fulltext_options(column_name, false, None)?
601                }
602                ApiUnsetIndexOptions::Inverted { column_name } => {
603                    self.change_column_inverted_index_options(column_name, false)?
604                }
605                ApiUnsetIndexOptions::Skipping { column_name } => {
606                    self.change_column_skipping_index_options(column_name, None)?
607                }
608            },
609            AlterKind::SetRegionOptions { options: _ } => {
610                // nothing to be done with RegionMetadata
611            }
612            AlterKind::UnsetRegionOptions { keys: _ } => {
613                // nothing to be done with RegionMetadata
614            }
615        }
616        Ok(self)
617    }
618
619    /// Consumes the builder and build a [RegionMetadata].
620    pub fn build(self) -> Result<RegionMetadata> {
621        let skipped = SkippedFields::new(&self.column_metadatas)?;
622
623        let meta = RegionMetadata {
624            schema: skipped.schema,
625            time_index: skipped.time_index,
626            id_to_index: skipped.id_to_index,
627            column_metadatas: self.column_metadatas,
628            primary_key: self.primary_key,
629            region_id: self.region_id,
630            schema_version: self.schema_version,
631            primary_key_encoding: self.primary_key_encoding,
632        };
633
634        meta.validate()?;
635
636        Ok(meta)
637    }
638
639    /// Adds columns to the metadata if not exist.
640    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
641        let mut names: HashSet<_> = self
642            .column_metadatas
643            .iter()
644            .map(|col| col.column_schema.name.clone())
645            .collect();
646
647        for add_column in columns {
648            if names.contains(&add_column.column_metadata.column_schema.name) {
649                // Column already exists.
650                continue;
651            }
652
653            let column_id = add_column.column_metadata.column_id;
654            let semantic_type = add_column.column_metadata.semantic_type;
655            let column_name = add_column.column_metadata.column_schema.name.clone();
656            match add_column.location {
657                None => {
658                    self.column_metadatas.push(add_column.column_metadata);
659                }
660                Some(AddColumnLocation::First) => {
661                    self.column_metadatas.insert(0, add_column.column_metadata);
662                }
663                Some(AddColumnLocation::After { column_name }) => {
664                    let pos = self
665                        .column_metadatas
666                        .iter()
667                        .position(|col| col.column_schema.name == column_name)
668                        .context(InvalidRegionRequestSnafu {
669                            region_id: self.region_id,
670                            err: format!(
671                                "column {} not found, failed to add column {} after it",
672                                column_name, add_column.column_metadata.column_schema.name
673                            ),
674                        })?;
675                    // Insert after pos.
676                    self.column_metadatas
677                        .insert(pos + 1, add_column.column_metadata);
678                }
679            }
680            names.insert(column_name);
681            if semantic_type == SemanticType::Tag {
682                // For a new tag, we extend the primary key.
683                self.primary_key.push(column_id);
684            }
685        }
686
687        Ok(())
688    }
689
690    /// Drops columns from the metadata if exist.
691    fn drop_columns(&mut self, names: &[String]) {
692        let name_set: HashSet<_> = names.iter().collect();
693        self.column_metadatas
694            .retain(|col| !name_set.contains(&col.column_schema.name));
695    }
696
697    /// Changes columns type to the metadata if exist.
698    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
699        let mut change_type_map: HashMap<_, _> = columns
700            .into_iter()
701            .map(
702                |ModifyColumnType {
703                     column_name,
704                     target_type,
705                 }| (column_name, target_type),
706            )
707            .collect();
708
709        for column_meta in self.column_metadatas.iter_mut() {
710            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
711                column_meta.column_schema.data_type = target_type.clone();
712                // also cast default value to target_type if default value exist
713                let new_default =
714                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
715                        Some(
716                            default_value
717                                .cast_to_datatype(&target_type)
718                                .with_context(|_| CastDefaultValueSnafu {
719                                    reason: format!(
720                                        "Failed to cast default value from {:?} to type {:?}",
721                                        default_value, target_type
722                                    ),
723                                })?,
724                        )
725                    } else {
726                        None
727                    };
728                column_meta.column_schema = column_meta
729                    .column_schema
730                    .clone()
731                    .with_default_constraint(new_default.clone())
732                    .with_context(|_| CastDefaultValueSnafu {
733                        reason: format!("Failed to set new default: {:?}", new_default),
734                    })?;
735            }
736        }
737
738        Ok(())
739    }
740
741    fn change_column_inverted_index_options(
742        &mut self,
743        column_name: String,
744        value: bool,
745    ) -> Result<()> {
746        for column_meta in self.column_metadatas.iter_mut() {
747            if column_meta.column_schema.name == column_name {
748                column_meta.column_schema.set_inverted_index(value)
749            }
750        }
751        Ok(())
752    }
753
754    fn change_column_fulltext_options(
755        &mut self,
756        column_name: String,
757        enable: bool,
758        options: Option<FulltextOptions>,
759    ) -> Result<()> {
760        for column_meta in self.column_metadatas.iter_mut() {
761            if column_meta.column_schema.name == column_name {
762                ensure!(
763                    column_meta.column_schema.data_type.is_string(),
764                    InvalidColumnOptionSnafu {
765                        column_name,
766                        msg: "FULLTEXT index only supports string type".to_string(),
767                    }
768                );
769
770                let current_fulltext_options = column_meta
771                    .column_schema
772                    .fulltext_options()
773                    .context(SetFulltextOptionsSnafu {
774                        column_name: column_name.clone(),
775                    })?;
776
777                if enable {
778                    ensure!(
779                        options.is_some(),
780                        InvalidColumnOptionSnafu {
781                            column_name,
782                            msg: "FULLTEXT index options must be provided",
783                        }
784                    );
785                    set_column_fulltext_options(
786                        column_meta,
787                        column_name,
788                        options.unwrap(),
789                        current_fulltext_options,
790                    )?;
791                } else {
792                    unset_column_fulltext_options(
793                        column_meta,
794                        column_name,
795                        current_fulltext_options,
796                    )?;
797                }
798                break;
799            }
800        }
801        Ok(())
802    }
803
804    fn change_column_skipping_index_options(
805        &mut self,
806        column_name: String,
807        options: Option<SkippingIndexOptions>,
808    ) -> Result<()> {
809        for column_meta in self.column_metadatas.iter_mut() {
810            if column_meta.column_schema.name == column_name {
811                if let Some(options) = &options {
812                    column_meta
813                        .column_schema
814                        .set_skipping_options(options)
815                        .context(UnsetSkippingIndexOptionsSnafu {
816                            column_name: column_name.clone(),
817                        })?;
818                } else {
819                    column_meta.column_schema.unset_skipping_options().context(
820                        UnsetSkippingIndexOptionsSnafu {
821                            column_name: column_name.clone(),
822                        },
823                    )?;
824                }
825            }
826        }
827        Ok(())
828    }
829}
830
831/// Fields skipped in serialization.
832struct SkippedFields {
833    /// Last schema.
834    schema: SchemaRef,
835    /// Id of the time index column.
836    time_index: ColumnId,
837    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
838    id_to_index: HashMap<ColumnId, usize>,
839}
840
841impl SkippedFields {
842    /// Constructs skipped fields from `column_metadatas`.
843    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
844        let column_schemas = column_metadatas
845            .iter()
846            .map(|column_metadata| column_metadata.column_schema.clone())
847            .collect();
848        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
849        let time_index = column_metadatas
850            .iter()
851            .find_map(|col| {
852                if col.semantic_type == SemanticType::Timestamp {
853                    Some(col.column_id)
854                } else {
855                    None
856                }
857            })
858            .context(InvalidMetaSnafu {
859                reason: "time index not found",
860            })?;
861        let id_to_index = column_metadatas
862            .iter()
863            .enumerate()
864            .map(|(idx, col)| (col.column_id, idx))
865            .collect();
866
867        Ok(SkippedFields {
868            schema,
869            time_index,
870            id_to_index,
871        })
872    }
873}
874
875#[derive(Snafu)]
876#[snafu(visibility(pub))]
877#[stack_trace_debug]
878pub enum MetadataError {
879    #[snafu(display("Invalid schema"))]
880    InvalidSchema {
881        source: datatypes::error::Error,
882        #[snafu(implicit)]
883        location: Location,
884    },
885
886    #[snafu(display("Invalid metadata, {}", reason))]
887    InvalidMeta {
888        reason: String,
889        #[snafu(implicit)]
890        location: Location,
891    },
892
893    #[snafu(display("Failed to ser/de json object"))]
894    SerdeJson {
895        #[snafu(implicit)]
896        location: Location,
897        #[snafu(source)]
898        error: serde_json::Error,
899    },
900
901    #[snafu(display("Invalid raw region request, err: {}", err))]
902    InvalidRawRegionRequest {
903        err: String,
904        #[snafu(implicit)]
905        location: Location,
906    },
907
908    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
909    InvalidRegionRequest {
910        region_id: RegionId,
911        err: String,
912        #[snafu(implicit)]
913        location: Location,
914    },
915
916    #[snafu(display("Unexpected schema error during project"))]
917    SchemaProject {
918        origin_schema: SchemaRef,
919        projection: Vec<ColumnId>,
920        #[snafu(implicit)]
921        location: Location,
922        source: datatypes::Error,
923    },
924
925    #[snafu(display("Time index column not found"))]
926    TimeIndexNotFound {
927        #[snafu(implicit)]
928        location: Location,
929    },
930
931    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
932    ChangeColumnNotFound {
933        column_name: String,
934        region_id: RegionId,
935        #[snafu(implicit)]
936        location: Location,
937    },
938
939    #[snafu(display("Failed to convert column schema"))]
940    ConvertColumnSchema {
941        source: api::error::Error,
942        #[snafu(implicit)]
943        location: Location,
944    },
945
946    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
947    InvalidSetRegionOptionRequest {
948        key: String,
949        value: String,
950        #[snafu(implicit)]
951        location: Location,
952    },
953
954    #[snafu(display("Invalid set region option request, key: {}", key))]
955    InvalidUnsetRegionOptionRequest {
956        key: String,
957        #[snafu(implicit)]
958        location: Location,
959    },
960
961    #[snafu(display("Failed to decode protobuf"))]
962    DecodeProto {
963        #[snafu(source)]
964        error: prost::UnknownEnumValue,
965        #[snafu(implicit)]
966        location: Location,
967    },
968
969    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
970    InvalidColumnOption {
971        column_name: String,
972        msg: String,
973        #[snafu(implicit)]
974        location: Location,
975    },
976
977    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
978    SetFulltextOptions {
979        column_name: String,
980        source: datatypes::Error,
981        #[snafu(implicit)]
982        location: Location,
983    },
984
985    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
986    SetSkippingIndexOptions {
987        column_name: String,
988        source: datatypes::Error,
989        #[snafu(implicit)]
990        location: Location,
991    },
992
993    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
994    UnsetSkippingIndexOptions {
995        column_name: String,
996        source: datatypes::Error,
997        #[snafu(implicit)]
998        location: Location,
999    },
1000
1001    #[snafu(display("Failed to decode arrow ipc record batches"))]
1002    DecodeArrowIpc {
1003        #[snafu(source)]
1004        error: arrow::error::ArrowError,
1005        #[snafu(implicit)]
1006        location: Location,
1007    },
1008
1009    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1010    CastDefaultValue {
1011        reason: String,
1012        source: datatypes::Error,
1013        #[snafu(implicit)]
1014        location: Location,
1015    },
1016
1017    #[snafu(display("Unexpected: {}", reason))]
1018    Unexpected {
1019        reason: String,
1020        #[snafu(implicit)]
1021        location: Location,
1022    },
1023
1024    #[snafu(display("Failed to encode/decode flight message"))]
1025    FlightCodec {
1026        source: common_grpc::Error,
1027        #[snafu(implicit)]
1028        location: Location,
1029    },
1030}
1031
1032impl ErrorExt for MetadataError {
1033    fn status_code(&self) -> StatusCode {
1034        StatusCode::InvalidArguments
1035    }
1036
1037    fn as_any(&self) -> &dyn Any {
1038        self
1039    }
1040}
1041
1042/// Set column fulltext options if it passed the validation.
1043///
1044/// Options allowed to modify:
1045/// * backend
1046///
1047/// Options not allowed to modify:
1048/// * analyzer
1049/// * case_sensitive
1050fn set_column_fulltext_options(
1051    column_meta: &mut ColumnMetadata,
1052    column_name: String,
1053    options: FulltextOptions,
1054    current_options: Option<FulltextOptions>,
1055) -> Result<()> {
1056    if let Some(current_options) = current_options {
1057        ensure!(
1058            current_options.analyzer == options.analyzer
1059                && current_options.case_sensitive == options.case_sensitive,
1060            InvalidColumnOptionSnafu {
1061                column_name,
1062                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1063                current_options.analyzer, current_options.case_sensitive),
1064            }
1065        );
1066    }
1067
1068    column_meta
1069        .column_schema
1070        .set_fulltext_options(&options)
1071        .context(SetFulltextOptionsSnafu { column_name })?;
1072
1073    Ok(())
1074}
1075
1076fn unset_column_fulltext_options(
1077    column_meta: &mut ColumnMetadata,
1078    column_name: String,
1079    current_options: Option<FulltextOptions>,
1080) -> Result<()> {
1081    if let Some(mut current_options) = current_options
1082        && current_options.enable
1083    {
1084        current_options.enable = false;
1085        column_meta
1086            .column_schema
1087            .set_fulltext_options(&current_options)
1088            .context(SetFulltextOptionsSnafu { column_name })?;
1089    } else {
1090        return InvalidColumnOptionSnafu {
1091            column_name,
1092            msg: "FULLTEXT index already disabled",
1093        }
1094        .fail();
1095    }
1096
1097    Ok(())
1098}
1099
1100#[cfg(test)]
1101mod test {
1102    use datatypes::prelude::ConcreteDataType;
1103    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1104
1105    use super::*;
1106
1107    fn create_builder() -> RegionMetadataBuilder {
1108        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1109    }
1110
1111    fn build_test_region_metadata() -> RegionMetadata {
1112        let mut builder = create_builder();
1113        builder
1114            .push_column_metadata(ColumnMetadata {
1115                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1116                semantic_type: SemanticType::Tag,
1117                column_id: 1,
1118            })
1119            .push_column_metadata(ColumnMetadata {
1120                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1121                semantic_type: SemanticType::Field,
1122                column_id: 2,
1123            })
1124            .push_column_metadata(ColumnMetadata {
1125                column_schema: ColumnSchema::new(
1126                    "c",
1127                    ConcreteDataType::timestamp_millisecond_datatype(),
1128                    false,
1129                ),
1130                semantic_type: SemanticType::Timestamp,
1131                column_id: 3,
1132            })
1133            .primary_key(vec![1]);
1134        builder.build().unwrap()
1135    }
1136
1137    #[test]
1138    fn test_region_metadata() {
1139        let region_metadata = build_test_region_metadata();
1140        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1141        assert_eq!(
1142            "a",
1143            region_metadata.column_by_id(1).unwrap().column_schema.name
1144        );
1145        assert_eq!(None, region_metadata.column_by_id(10));
1146    }
1147
1148    #[test]
1149    fn test_region_metadata_serde() {
1150        let region_metadata = build_test_region_metadata();
1151        let serialized = serde_json::to_string(&region_metadata).unwrap();
1152        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1153        assert_eq!(region_metadata, deserialized);
1154    }
1155
1156    #[test]
1157    fn test_column_metadata_validate() {
1158        let mut builder = create_builder();
1159        let col = ColumnMetadata {
1160            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1161            semantic_type: SemanticType::Timestamp,
1162            column_id: 1,
1163        };
1164
1165        builder.push_column_metadata(col);
1166        let err = builder.build().unwrap_err();
1167        assert!(
1168            err.to_string()
1169                .contains("column `ts` is not timestamp type"),
1170            "unexpected err: {err}",
1171        );
1172    }
1173
1174    #[test]
1175    fn test_empty_region_metadata() {
1176        let builder = create_builder();
1177        let err = builder.build().unwrap_err();
1178        // A region must have a time index.
1179        assert!(
1180            err.to_string().contains("time index not found"),
1181            "unexpected err: {err}",
1182        );
1183    }
1184
1185    #[test]
1186    fn test_same_column_id() {
1187        let mut builder = create_builder();
1188        builder
1189            .push_column_metadata(ColumnMetadata {
1190                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1191                semantic_type: SemanticType::Tag,
1192                column_id: 1,
1193            })
1194            .push_column_metadata(ColumnMetadata {
1195                column_schema: ColumnSchema::new(
1196                    "b",
1197                    ConcreteDataType::timestamp_millisecond_datatype(),
1198                    false,
1199                ),
1200                semantic_type: SemanticType::Timestamp,
1201                column_id: 1,
1202            });
1203        let err = builder.build().unwrap_err();
1204        assert!(
1205            err.to_string()
1206                .contains("column a and b have the same column id"),
1207            "unexpected err: {err}",
1208        );
1209    }
1210
1211    #[test]
1212    fn test_duplicate_time_index() {
1213        let mut builder = create_builder();
1214        builder
1215            .push_column_metadata(ColumnMetadata {
1216                column_schema: ColumnSchema::new(
1217                    "a",
1218                    ConcreteDataType::timestamp_millisecond_datatype(),
1219                    false,
1220                ),
1221                semantic_type: SemanticType::Timestamp,
1222                column_id: 1,
1223            })
1224            .push_column_metadata(ColumnMetadata {
1225                column_schema: ColumnSchema::new(
1226                    "b",
1227                    ConcreteDataType::timestamp_millisecond_datatype(),
1228                    false,
1229                ),
1230                semantic_type: SemanticType::Timestamp,
1231                column_id: 2,
1232            });
1233        let err = builder.build().unwrap_err();
1234        assert!(
1235            err.to_string().contains("expect only one time index"),
1236            "unexpected err: {err}",
1237        );
1238    }
1239
1240    #[test]
1241    fn test_unknown_primary_key() {
1242        let mut builder = create_builder();
1243        builder
1244            .push_column_metadata(ColumnMetadata {
1245                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1246                semantic_type: SemanticType::Tag,
1247                column_id: 1,
1248            })
1249            .push_column_metadata(ColumnMetadata {
1250                column_schema: ColumnSchema::new(
1251                    "b",
1252                    ConcreteDataType::timestamp_millisecond_datatype(),
1253                    false,
1254                ),
1255                semantic_type: SemanticType::Timestamp,
1256                column_id: 2,
1257            })
1258            .primary_key(vec![3]);
1259        let err = builder.build().unwrap_err();
1260        assert!(
1261            err.to_string().contains("unknown column id 3"),
1262            "unexpected err: {err}",
1263        );
1264    }
1265
1266    #[test]
1267    fn test_same_primary_key() {
1268        let mut builder = create_builder();
1269        builder
1270            .push_column_metadata(ColumnMetadata {
1271                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1272                semantic_type: SemanticType::Tag,
1273                column_id: 1,
1274            })
1275            .push_column_metadata(ColumnMetadata {
1276                column_schema: ColumnSchema::new(
1277                    "b",
1278                    ConcreteDataType::timestamp_millisecond_datatype(),
1279                    false,
1280                ),
1281                semantic_type: SemanticType::Timestamp,
1282                column_id: 2,
1283            })
1284            .primary_key(vec![1, 1]);
1285        let err = builder.build().unwrap_err();
1286        assert!(
1287            err.to_string()
1288                .contains("duplicate column a in primary key"),
1289            "unexpected err: {err}",
1290        );
1291    }
1292
1293    #[test]
1294    fn test_in_time_index() {
1295        let mut builder = create_builder();
1296        builder
1297            .push_column_metadata(ColumnMetadata {
1298                column_schema: ColumnSchema::new(
1299                    "ts",
1300                    ConcreteDataType::timestamp_millisecond_datatype(),
1301                    false,
1302                ),
1303                semantic_type: SemanticType::Timestamp,
1304                column_id: 1,
1305            })
1306            .primary_key(vec![1]);
1307        let err = builder.build().unwrap_err();
1308        assert!(
1309            err.to_string()
1310                .contains("column ts is already a time index column"),
1311            "unexpected err: {err}",
1312        );
1313    }
1314
1315    #[test]
1316    fn test_nullable_time_index() {
1317        let mut builder = create_builder();
1318        builder.push_column_metadata(ColumnMetadata {
1319            column_schema: ColumnSchema::new(
1320                "ts",
1321                ConcreteDataType::timestamp_millisecond_datatype(),
1322                true,
1323            ),
1324            semantic_type: SemanticType::Timestamp,
1325            column_id: 1,
1326        });
1327        let err = builder.build().unwrap_err();
1328        assert!(
1329            err.to_string()
1330                .contains("time index column ts must be NOT NULL"),
1331            "unexpected err: {err}",
1332        );
1333    }
1334
1335    #[test]
1336    fn test_primary_key_semantic_type() {
1337        let mut builder = create_builder();
1338        builder
1339            .push_column_metadata(ColumnMetadata {
1340                column_schema: ColumnSchema::new(
1341                    "ts",
1342                    ConcreteDataType::timestamp_millisecond_datatype(),
1343                    false,
1344                ),
1345                semantic_type: SemanticType::Timestamp,
1346                column_id: 1,
1347            })
1348            .push_column_metadata(ColumnMetadata {
1349                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1350                semantic_type: SemanticType::Field,
1351                column_id: 2,
1352            })
1353            .primary_key(vec![2]);
1354        let err = builder.build().unwrap_err();
1355        assert!(
1356            err.to_string()
1357                .contains("semantic type of column a should be Tag, not Field"),
1358            "unexpected err: {err}",
1359        );
1360    }
1361
1362    #[test]
1363    fn test_primary_key_tag_num() {
1364        let mut builder = create_builder();
1365        builder
1366            .push_column_metadata(ColumnMetadata {
1367                column_schema: ColumnSchema::new(
1368                    "ts",
1369                    ConcreteDataType::timestamp_millisecond_datatype(),
1370                    false,
1371                ),
1372                semantic_type: SemanticType::Timestamp,
1373                column_id: 1,
1374            })
1375            .push_column_metadata(ColumnMetadata {
1376                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1377                semantic_type: SemanticType::Tag,
1378                column_id: 2,
1379            })
1380            .push_column_metadata(ColumnMetadata {
1381                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1382                semantic_type: SemanticType::Tag,
1383                column_id: 3,
1384            })
1385            .primary_key(vec![2]);
1386        let err = builder.build().unwrap_err();
1387        assert!(
1388            err.to_string()
1389                .contains("number of primary key columns 1 not equal to tag columns 2"),
1390            "unexpected err: {err}",
1391        );
1392    }
1393
1394    #[test]
1395    fn test_bump_version() {
1396        let mut region_metadata = build_test_region_metadata();
1397        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1398        builder.bump_version();
1399        let new_meta = builder.build().unwrap();
1400        region_metadata.schema_version += 1;
1401        assert_eq!(region_metadata, new_meta);
1402    }
1403
1404    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1405        let semantic_type = if is_tag {
1406            SemanticType::Tag
1407        } else {
1408            SemanticType::Field
1409        };
1410        ColumnMetadata {
1411            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1412            semantic_type,
1413            column_id,
1414        }
1415    }
1416
1417    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1418        let actual: Vec<_> = metadata
1419            .column_metadatas
1420            .iter()
1421            .map(|col| &col.column_schema.name)
1422            .collect();
1423        assert_eq!(names, actual);
1424    }
1425
1426    #[test]
1427    fn test_alter() {
1428        // a (tag), b (field), c (ts)
1429        let metadata = build_test_region_metadata();
1430        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1431        // tag d
1432        builder
1433            .alter(AlterKind::AddColumns {
1434                columns: vec![AddColumn {
1435                    column_metadata: new_column_metadata("d", true, 4),
1436                    location: None,
1437                }],
1438            })
1439            .unwrap();
1440        let metadata = builder.build().unwrap();
1441        check_columns(&metadata, &["a", "b", "c", "d"]);
1442        assert_eq!([1, 4], &metadata.primary_key[..]);
1443
1444        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1445        builder
1446            .alter(AlterKind::AddColumns {
1447                columns: vec![AddColumn {
1448                    column_metadata: new_column_metadata("e", false, 5),
1449                    location: Some(AddColumnLocation::First),
1450                }],
1451            })
1452            .unwrap();
1453        let metadata = builder.build().unwrap();
1454        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1455
1456        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1457        builder
1458            .alter(AlterKind::AddColumns {
1459                columns: vec![AddColumn {
1460                    column_metadata: new_column_metadata("f", false, 6),
1461                    location: Some(AddColumnLocation::After {
1462                        column_name: "b".to_string(),
1463                    }),
1464                }],
1465            })
1466            .unwrap();
1467        let metadata = builder.build().unwrap();
1468        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1469
1470        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1471        builder
1472            .alter(AlterKind::AddColumns {
1473                columns: vec![AddColumn {
1474                    column_metadata: new_column_metadata("g", false, 7),
1475                    location: Some(AddColumnLocation::After {
1476                        column_name: "d".to_string(),
1477                    }),
1478                }],
1479            })
1480            .unwrap();
1481        let metadata = builder.build().unwrap();
1482        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1483
1484        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1485        builder
1486            .alter(AlterKind::DropColumns {
1487                names: vec!["g".to_string(), "e".to_string()],
1488            })
1489            .unwrap();
1490        let metadata = builder.build().unwrap();
1491        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1492
1493        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1494        builder
1495            .alter(AlterKind::DropColumns {
1496                names: vec!["a".to_string()],
1497            })
1498            .unwrap();
1499        // Build returns error as the primary key contains a.
1500        let err = builder.build().unwrap_err();
1501        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1502
1503        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1504        builder
1505            .alter(AlterKind::ModifyColumnTypes {
1506                columns: vec![ModifyColumnType {
1507                    column_name: "b".to_string(),
1508                    target_type: ConcreteDataType::string_datatype(),
1509                }],
1510            })
1511            .unwrap();
1512        let metadata = builder.build().unwrap();
1513        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1514        let b_type = &metadata
1515            .column_by_name("b")
1516            .unwrap()
1517            .column_schema
1518            .data_type;
1519        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1520
1521        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1522        builder
1523            .alter(AlterKind::SetIndex {
1524                options: ApiSetIndexOptions::Fulltext {
1525                    column_name: "b".to_string(),
1526                    options: FulltextOptions {
1527                        enable: true,
1528                        analyzer: FulltextAnalyzer::Chinese,
1529                        case_sensitive: true,
1530                        backend: FulltextBackend::Bloom,
1531                    },
1532                },
1533            })
1534            .unwrap();
1535        let metadata = builder.build().unwrap();
1536        let a_fulltext_options = metadata
1537            .column_by_name("b")
1538            .unwrap()
1539            .column_schema
1540            .fulltext_options()
1541            .unwrap()
1542            .unwrap();
1543        assert!(a_fulltext_options.enable);
1544        assert_eq!(
1545            datatypes::schema::FulltextAnalyzer::Chinese,
1546            a_fulltext_options.analyzer
1547        );
1548        assert!(a_fulltext_options.case_sensitive);
1549
1550        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1551        builder
1552            .alter(AlterKind::UnsetIndex {
1553                options: ApiUnsetIndexOptions::Fulltext {
1554                    column_name: "b".to_string(),
1555                },
1556            })
1557            .unwrap();
1558        let metadata = builder.build().unwrap();
1559        let a_fulltext_options = metadata
1560            .column_by_name("b")
1561            .unwrap()
1562            .column_schema
1563            .fulltext_options()
1564            .unwrap()
1565            .unwrap();
1566        assert!(!a_fulltext_options.enable);
1567        assert_eq!(
1568            datatypes::schema::FulltextAnalyzer::Chinese,
1569            a_fulltext_options.analyzer
1570        );
1571        assert!(a_fulltext_options.case_sensitive);
1572    }
1573
1574    #[test]
1575    fn test_add_if_not_exists() {
1576        // a (tag), b (field), c (ts)
1577        let metadata = build_test_region_metadata();
1578        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1579        // tag d
1580        builder
1581            .alter(AlterKind::AddColumns {
1582                columns: vec![
1583                    AddColumn {
1584                        column_metadata: new_column_metadata("d", true, 4),
1585                        location: None,
1586                    },
1587                    AddColumn {
1588                        column_metadata: new_column_metadata("d", true, 4),
1589                        location: None,
1590                    },
1591                ],
1592            })
1593            .unwrap();
1594        let metadata = builder.build().unwrap();
1595        check_columns(&metadata, &["a", "b", "c", "d"]);
1596        assert_eq!([1, 4], &metadata.primary_key[..]);
1597
1598        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1599        // field b.
1600        builder
1601            .alter(AlterKind::AddColumns {
1602                columns: vec![AddColumn {
1603                    column_metadata: new_column_metadata("b", false, 2),
1604                    location: None,
1605                }],
1606            })
1607            .unwrap();
1608        let metadata = builder.build().unwrap();
1609        check_columns(&metadata, &["a", "b", "c", "d"]);
1610    }
1611
1612    #[test]
1613    fn test_add_column_with_inverted_index() {
1614        // only set inverted index to true explicitly will this column be inverted indexed
1615
1616        // a (tag), b (field), c (ts)
1617        let metadata = build_test_region_metadata();
1618        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1619        // tag d, e
1620        let mut col = new_column_metadata("d", true, 4);
1621        col.column_schema.set_inverted_index(true);
1622        builder
1623            .alter(AlterKind::AddColumns {
1624                columns: vec![
1625                    AddColumn {
1626                        column_metadata: col,
1627                        location: None,
1628                    },
1629                    AddColumn {
1630                        column_metadata: new_column_metadata("e", true, 5),
1631                        location: None,
1632                    },
1633                ],
1634            })
1635            .unwrap();
1636        let metadata = builder.build().unwrap();
1637        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1638        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1639        let column_metadata = metadata.column_by_name("a").unwrap();
1640        assert!(!column_metadata.column_schema.is_inverted_indexed());
1641        let column_metadata = metadata.column_by_name("b").unwrap();
1642        assert!(!column_metadata.column_schema.is_inverted_indexed());
1643        let column_metadata = metadata.column_by_name("c").unwrap();
1644        assert!(!column_metadata.column_schema.is_inverted_indexed());
1645        let column_metadata = metadata.column_by_name("d").unwrap();
1646        assert!(column_metadata.column_schema.is_inverted_indexed());
1647        let column_metadata = metadata.column_by_name("e").unwrap();
1648        assert!(!column_metadata.column_schema.is_inverted_indexed());
1649    }
1650
1651    #[test]
1652    fn test_drop_if_exists() {
1653        // a (tag), b (field), c (ts)
1654        let metadata = build_test_region_metadata();
1655        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1656        // field d, e
1657        builder
1658            .alter(AlterKind::AddColumns {
1659                columns: vec![
1660                    AddColumn {
1661                        column_metadata: new_column_metadata("d", false, 4),
1662                        location: None,
1663                    },
1664                    AddColumn {
1665                        column_metadata: new_column_metadata("e", false, 5),
1666                        location: None,
1667                    },
1668                ],
1669            })
1670            .unwrap();
1671        let metadata = builder.build().unwrap();
1672        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1673
1674        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1675        builder
1676            .alter(AlterKind::DropColumns {
1677                names: vec!["b".to_string(), "b".to_string()],
1678            })
1679            .unwrap();
1680        let metadata = builder.build().unwrap();
1681        check_columns(&metadata, &["a", "c", "d", "e"]);
1682
1683        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1684        builder
1685            .alter(AlterKind::DropColumns {
1686                names: vec!["b".to_string(), "e".to_string()],
1687            })
1688            .unwrap();
1689        let metadata = builder.build().unwrap();
1690        check_columns(&metadata, &["a", "c", "d"]);
1691    }
1692
1693    #[test]
1694    fn test_invalid_column_name() {
1695        let mut builder = create_builder();
1696        builder.push_column_metadata(ColumnMetadata {
1697            column_schema: ColumnSchema::new(
1698                "__sequence",
1699                ConcreteDataType::timestamp_millisecond_datatype(),
1700                false,
1701            ),
1702            semantic_type: SemanticType::Timestamp,
1703            column_id: 1,
1704        });
1705        let err = builder.build().unwrap_err();
1706        assert!(
1707            err.to_string()
1708                .contains("internal column name that can not be used"),
1709            "unexpected err: {err}",
1710        );
1711    }
1712
1713    #[test]
1714    fn test_debug_for_column_metadata() {
1715        let region_metadata = build_test_region_metadata();
1716        let formatted = format!("{:?}", region_metadata);
1717        assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1718    }
1719
1720    #[test]
1721    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1722        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1723        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1724        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1725
1726        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1727        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1728        assert_eq!(
1729            deserialized.primary_key_encoding,
1730            PrimaryKeyEncoding::Sparse
1731        );
1732    }
1733}