store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
33use serde::de::Error;
34use serde::{Deserialize, Deserializer, Serialize};
35use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
36
37use crate::codec::PrimaryKeyEncoding;
38use crate::region_request::{
39    AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, ApiUnsetIndexOptions,
40    ModifyColumnType,
41};
42use crate::storage::consts::is_internal_column;
43use crate::storage::{ColumnId, RegionId};
44
45pub type Result<T> = std::result::Result<T, MetadataError>;
46
47/// Metadata of a column.
48#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
49pub struct ColumnMetadata {
50    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
51    pub column_schema: ColumnSchema,
52    /// Semantic type of this column (e.g. tag or timestamp).
53    pub semantic_type: SemanticType,
54    /// Immutable and unique id of a region.
55    pub column_id: ColumnId,
56}
57
58impl fmt::Debug for ColumnMetadata {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        write!(
61            f,
62            "[{:?} {:?} {:?}]",
63            self.column_schema, self.semantic_type, self.column_id,
64        )
65    }
66}
67
68impl ColumnMetadata {
69    /// Construct `Self` from protobuf struct [RegionColumnDef]
70    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
71        let column_id = column_def.column_id;
72        let column_def = column_def
73            .column_def
74            .context(InvalidRawRegionRequestSnafu {
75                err: "column_def is absent",
76            })?;
77        let semantic_type = column_def.semantic_type();
78        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
79
80        Ok(Self {
81            column_schema,
82            semantic_type,
83            column_id,
84        })
85    }
86
87    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
88    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
89        serde_json::to_vec(columns)
90    }
91
92    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
93    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
94        serde_json::from_slice(bytes)
95    }
96
97    pub fn is_same_datatype(&self, other: &Self) -> bool {
98        self.column_schema.data_type == other.column_schema.data_type
99    }
100}
101
102#[cfg_attr(doc, aquamarine::aquamarine)]
103/// General static metadata of a region.
104///
105/// This struct implements [Serialize] and [Deserialize] traits.
106/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
107///
108/// ```mermaid
109/// class RegionMetadata {
110///     +RegionId region_id
111///     +SchemaRef schema
112///     +Vec&lt;ColumnMetadata&gt; column_metadatas
113///     +Vec&lt;ColumnId&gt; primary_key
114/// }
115/// class Schema
116/// class ColumnMetadata {
117///     +ColumnSchema column_schema
118///     +SemanticTyle semantic_type
119///     +ColumnId column_id
120/// }
121/// class SemanticType
122/// RegionMetadata o-- Schema
123/// RegionMetadata o-- ColumnMetadata
124/// ColumnMetadata o-- SemanticType
125/// ```
126#[derive(Clone, PartialEq, Eq, Serialize)]
127pub struct RegionMetadata {
128    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
129    #[serde(skip)]
130    pub schema: SchemaRef,
131
132    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
133    // so we can assumes they are valid.
134    /// Id of the time index column.
135    #[serde(skip)]
136    time_index: ColumnId,
137    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
138    #[serde(skip)]
139    id_to_index: HashMap<ColumnId, usize>,
140
141    /// Columns in the region. Has the same order as columns
142    /// in [schema](RegionMetadata::schema).
143    pub column_metadatas: Vec<ColumnMetadata>,
144    /// Maintains an ordered list of primary keys
145    pub primary_key: Vec<ColumnId>,
146
147    /// Immutable and unique id of a region.
148    pub region_id: RegionId,
149    /// Current version of the region schema.
150    ///
151    /// The version starts from 0. Altering the schema bumps the version.
152    pub schema_version: u64,
153
154    /// Primary key encoding mode.
155    pub primary_key_encoding: PrimaryKeyEncoding,
156}
157
158impl fmt::Debug for RegionMetadata {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        f.debug_struct("RegionMetadata")
161            .field("column_metadatas", &self.column_metadatas)
162            .field("time_index", &self.time_index)
163            .field("primary_key", &self.primary_key)
164            .field("region_id", &self.region_id)
165            .field("schema_version", &self.schema_version)
166            .finish()
167    }
168}
169
170pub type RegionMetadataRef = Arc<RegionMetadata>;
171
172impl<'de> Deserialize<'de> for RegionMetadata {
173    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
174    where
175        D: Deserializer<'de>,
176    {
177        // helper internal struct for deserialization
178        #[derive(Deserialize)]
179        struct RegionMetadataWithoutSchema {
180            column_metadatas: Vec<ColumnMetadata>,
181            primary_key: Vec<ColumnId>,
182            region_id: RegionId,
183            schema_version: u64,
184            #[serde(default)]
185            primary_key_encoding: PrimaryKeyEncoding,
186        }
187
188        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
189        let skipped =
190            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
191
192        Ok(Self {
193            schema: skipped.schema,
194            time_index: skipped.time_index,
195            id_to_index: skipped.id_to_index,
196            column_metadatas: without_schema.column_metadatas,
197            primary_key: without_schema.primary_key,
198            region_id: without_schema.region_id,
199            schema_version: without_schema.schema_version,
200            primary_key_encoding: without_schema.primary_key_encoding,
201        })
202    }
203}
204
205impl RegionMetadata {
206    /// Decode the metadata from a JSON str.
207    pub fn from_json(s: &str) -> Result<Self> {
208        serde_json::from_str(s).context(SerdeJsonSnafu)
209    }
210
211    /// Encode the metadata to a JSON string.
212    pub fn to_json(&self) -> Result<String> {
213        serde_json::to_string(&self).context(SerdeJsonSnafu)
214    }
215
216    /// Find column by id.
217    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
218        self.id_to_index
219            .get(&column_id)
220            .map(|index| &self.column_metadatas[*index])
221    }
222
223    /// Find column index by id.
224    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
225        self.id_to_index.get(&column_id).copied()
226    }
227
228    /// Find column index by name.
229    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
230        self.column_metadatas
231            .iter()
232            .position(|col| col.column_schema.name == column_name)
233    }
234
235    /// Returns the time index column
236    ///
237    /// # Panics
238    /// Panics if the time index column id is invalid.
239    pub fn time_index_column(&self) -> &ColumnMetadata {
240        let index = self.id_to_index[&self.time_index];
241        &self.column_metadatas[index]
242    }
243
244    /// Returns the position of the time index.
245    pub fn time_index_column_pos(&self) -> usize {
246        self.id_to_index[&self.time_index]
247    }
248
249    /// Returns the arrow field of the time index column.
250    pub fn time_index_field(&self) -> FieldRef {
251        let index = self.id_to_index[&self.time_index];
252        self.schema.arrow_schema().fields[index].clone()
253    }
254
255    /// Finds a column by name.
256    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
257        self.schema
258            .column_index_by_name(name)
259            .map(|index| &self.column_metadatas[index])
260    }
261
262    /// Returns all primary key columns.
263    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
264        // safety: RegionMetadata::validate ensures every primary key exists.
265        self.primary_key
266            .iter()
267            .map(|id| self.column_by_id(*id).unwrap())
268    }
269
270    /// Returns all field columns before projection.
271    ///
272    /// **Use with caution**. On read path where might have projection, this method
273    /// can return columns that not present in data batch.
274    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
275        self.column_metadatas
276            .iter()
277            .filter(|column| column.semantic_type == SemanticType::Field)
278    }
279
280    /// Returns a column's index in primary key if it is a primary key column.
281    ///
282    /// This does a linear search.
283    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
284        self.primary_key.iter().position(|id| *id == column_id)
285    }
286
287    /// Project the metadata to a new one using specified column ids.
288    ///
289    /// [RegionId] and schema version are preserved.
290    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
291        // check time index
292        ensure!(
293            projection.contains(&self.time_index),
294            TimeIndexNotFoundSnafu
295        );
296
297        // prepare new indices
298        let indices_to_preserve = projection
299            .iter()
300            .map(|id| {
301                self.column_index_by_id(*id)
302                    .with_context(|| InvalidRegionRequestSnafu {
303                        region_id: self.region_id,
304                        err: format!("column id {} not found", id),
305                    })
306            })
307            .collect::<Result<Vec<_>>>()?;
308
309        // project schema
310        let projected_schema =
311            self.schema
312                .try_project(&indices_to_preserve)
313                .with_context(|_| SchemaProjectSnafu {
314                    origin_schema: self.schema.clone(),
315                    projection: projection.to_vec(),
316                })?;
317
318        // project columns, generate projected primary key and new id_to_index
319        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
320        let mut projected_primary_key = vec![];
321        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
322        for index in indices_to_preserve {
323            let col = self.column_metadatas[index].clone();
324            if col.semantic_type == SemanticType::Tag {
325                projected_primary_key.push(col.column_id);
326            }
327            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
328            projected_column_metadatas.push(col);
329        }
330
331        Ok(RegionMetadata {
332            schema: Arc::new(projected_schema),
333            time_index: self.time_index,
334            id_to_index: projected_id_to_index,
335            column_metadatas: projected_column_metadatas,
336            primary_key: projected_primary_key,
337            region_id: self.region_id,
338            schema_version: self.schema_version,
339            primary_key_encoding: self.primary_key_encoding,
340        })
341    }
342
343    /// Gets the column ids to be indexed by inverted index.
344    pub fn inverted_indexed_column_ids<'a>(
345        &self,
346        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
347    ) -> HashSet<ColumnId> {
348        let mut inverted_index = self
349            .column_metadatas
350            .iter()
351            .filter(|column| column.column_schema.is_inverted_indexed())
352            .map(|column| column.column_id)
353            .collect::<HashSet<_>>();
354
355        for ignored in ignore_column_ids {
356            inverted_index.remove(ignored);
357        }
358
359        inverted_index
360    }
361
362    /// Checks whether the metadata is valid.
363    fn validate(&self) -> Result<()> {
364        // Id to name.
365        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
366        for col in &self.column_metadatas {
367            // Validate each column.
368            Self::validate_column_metadata(col)?;
369
370            // Check whether column id is duplicated. We already check column name
371            // is unique in `Schema` so we only check column id here.
372            ensure!(
373                !id_names.contains_key(&col.column_id),
374                InvalidMetaSnafu {
375                    reason: format!(
376                        "column {} and {} have the same column id {}",
377                        id_names[&col.column_id], col.column_schema.name, col.column_id,
378                    ),
379                }
380            );
381            id_names.insert(col.column_id, &col.column_schema.name);
382        }
383
384        // Checks there is only one time index.
385        let num_time_index = self
386            .column_metadatas
387            .iter()
388            .filter(|col| col.semantic_type == SemanticType::Timestamp)
389            .count();
390        ensure!(
391            num_time_index == 1,
392            InvalidMetaSnafu {
393                reason: format!("expect only one time index, found {}", num_time_index),
394            }
395        );
396
397        // Checks the time index column is not nullable.
398        ensure!(
399            !self.time_index_column().column_schema.is_nullable(),
400            InvalidMetaSnafu {
401                reason: format!(
402                    "time index column {} must be NOT NULL",
403                    self.time_index_column().column_schema.name
404                ),
405            }
406        );
407
408        if !self.primary_key.is_empty() {
409            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
410            // Checks column ids in the primary key is valid.
411            for column_id in &self.primary_key {
412                // Checks whether the column id exists.
413                ensure!(
414                    id_names.contains_key(column_id),
415                    InvalidMetaSnafu {
416                        reason: format!("unknown column id {}", column_id),
417                    }
418                );
419
420                // Safety: Column with specific id must exist.
421                let column = self.column_by_id(*column_id).unwrap();
422                // Checks duplicate.
423                ensure!(
424                    !pk_ids.contains(&column_id),
425                    InvalidMetaSnafu {
426                        reason: format!(
427                            "duplicate column {} in primary key",
428                            column.column_schema.name
429                        ),
430                    }
431                );
432
433                // Checks this is not a time index column.
434                ensure!(
435                    *column_id != self.time_index,
436                    InvalidMetaSnafu {
437                        reason: format!(
438                            "column {} is already a time index column",
439                            column.column_schema.name,
440                        ),
441                    }
442                );
443
444                // Checks semantic type.
445                ensure!(
446                    column.semantic_type == SemanticType::Tag,
447                    InvalidMetaSnafu {
448                        reason: format!(
449                            "semantic type of column {} should be Tag, not {:?}",
450                            column.column_schema.name, column.semantic_type
451                        ),
452                    }
453                );
454
455                pk_ids.insert(column_id);
456            }
457        }
458
459        // Checks tag semantic type.
460        let num_tag = self
461            .column_metadatas
462            .iter()
463            .filter(|col| col.semantic_type == SemanticType::Tag)
464            .count();
465        ensure!(
466            num_tag == self.primary_key.len(),
467            InvalidMetaSnafu {
468                reason: format!(
469                    "number of primary key columns {} not equal to tag columns {}",
470                    self.primary_key.len(),
471                    num_tag
472                ),
473            }
474        );
475
476        Ok(())
477    }
478
479    /// Checks whether it is a valid column.
480    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
481        if column_metadata.semantic_type == SemanticType::Timestamp {
482            ensure!(
483                column_metadata.column_schema.data_type.is_timestamp(),
484                InvalidMetaSnafu {
485                    reason: format!(
486                        "column `{}` is not timestamp type",
487                        column_metadata.column_schema.name
488                    ),
489                }
490            );
491        }
492
493        ensure!(
494            !is_internal_column(&column_metadata.column_schema.name),
495            InvalidMetaSnafu {
496                reason: format!(
497                    "{} is internal column name that can not be used",
498                    column_metadata.column_schema.name
499                ),
500            }
501        );
502
503        Ok(())
504    }
505}
506
507/// Builder to build [RegionMetadata].
508pub struct RegionMetadataBuilder {
509    region_id: RegionId,
510    column_metadatas: Vec<ColumnMetadata>,
511    primary_key: Vec<ColumnId>,
512    schema_version: u64,
513    primary_key_encoding: PrimaryKeyEncoding,
514}
515
516impl RegionMetadataBuilder {
517    /// Returns a new builder.
518    pub fn new(id: RegionId) -> Self {
519        Self {
520            region_id: id,
521            column_metadatas: vec![],
522            primary_key: vec![],
523            schema_version: 0,
524            primary_key_encoding: PrimaryKeyEncoding::Dense,
525        }
526    }
527
528    /// Creates a builder from existing [RegionMetadata].
529    pub fn from_existing(existing: RegionMetadata) -> Self {
530        Self {
531            column_metadatas: existing.column_metadatas,
532            primary_key: existing.primary_key,
533            region_id: existing.region_id,
534            schema_version: existing.schema_version,
535            primary_key_encoding: existing.primary_key_encoding,
536        }
537    }
538
539    /// Sets the primary key encoding mode.
540    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
541        self.primary_key_encoding = encoding;
542        self
543    }
544
545    /// Pushes a new column metadata to this region's metadata.
546    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
547        self.column_metadatas.push(column_metadata);
548        self
549    }
550
551    /// Sets the primary key of the region.
552    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
553        self.primary_key = key;
554        self
555    }
556
557    /// Increases the schema version by 1.
558    pub fn bump_version(&mut self) -> &mut Self {
559        self.schema_version += 1;
560        self
561    }
562
563    /// Applies the alter `kind` to the builder.
564    ///
565    /// The `kind` should be valid.
566    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
567        match kind {
568            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
569            AlterKind::DropColumns { names } => self.drop_columns(&names),
570            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns),
571            AlterKind::SetIndex { options } => match options {
572                ApiSetIndexOptions::Fulltext {
573                    column_name,
574                    options,
575                } => self.change_column_fulltext_options(column_name, true, Some(options))?,
576                ApiSetIndexOptions::Inverted { column_name } => {
577                    self.change_column_inverted_index_options(column_name, true)?
578                }
579                ApiSetIndexOptions::Skipping {
580                    column_name,
581                    options,
582                } => self.change_column_skipping_index_options(column_name, Some(options))?,
583            },
584            AlterKind::UnsetIndex { options } => match options {
585                ApiUnsetIndexOptions::Fulltext { column_name } => {
586                    self.change_column_fulltext_options(column_name, false, None)?
587                }
588                ApiUnsetIndexOptions::Inverted { column_name } => {
589                    self.change_column_inverted_index_options(column_name, false)?
590                }
591                ApiUnsetIndexOptions::Skipping { column_name } => {
592                    self.change_column_skipping_index_options(column_name, None)?
593                }
594            },
595            AlterKind::SetRegionOptions { options: _ } => {
596                // nothing to be done with RegionMetadata
597            }
598            AlterKind::UnsetRegionOptions { keys: _ } => {
599                // nothing to be done with RegionMetadata
600            }
601        }
602        Ok(self)
603    }
604
605    /// Consumes the builder and build a [RegionMetadata].
606    pub fn build(self) -> Result<RegionMetadata> {
607        let skipped = SkippedFields::new(&self.column_metadatas)?;
608
609        let meta = RegionMetadata {
610            schema: skipped.schema,
611            time_index: skipped.time_index,
612            id_to_index: skipped.id_to_index,
613            column_metadatas: self.column_metadatas,
614            primary_key: self.primary_key,
615            region_id: self.region_id,
616            schema_version: self.schema_version,
617            primary_key_encoding: self.primary_key_encoding,
618        };
619
620        meta.validate()?;
621
622        Ok(meta)
623    }
624
625    /// Adds columns to the metadata if not exist.
626    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
627        let mut names: HashSet<_> = self
628            .column_metadatas
629            .iter()
630            .map(|col| col.column_schema.name.clone())
631            .collect();
632
633        for add_column in columns {
634            if names.contains(&add_column.column_metadata.column_schema.name) {
635                // Column already exists.
636                continue;
637            }
638
639            let column_id = add_column.column_metadata.column_id;
640            let semantic_type = add_column.column_metadata.semantic_type;
641            let column_name = add_column.column_metadata.column_schema.name.clone();
642            match add_column.location {
643                None => {
644                    self.column_metadatas.push(add_column.column_metadata);
645                }
646                Some(AddColumnLocation::First) => {
647                    self.column_metadatas.insert(0, add_column.column_metadata);
648                }
649                Some(AddColumnLocation::After { column_name }) => {
650                    let pos = self
651                        .column_metadatas
652                        .iter()
653                        .position(|col| col.column_schema.name == column_name)
654                        .context(InvalidRegionRequestSnafu {
655                            region_id: self.region_id,
656                            err: format!(
657                                "column {} not found, failed to add column {} after it",
658                                column_name, add_column.column_metadata.column_schema.name
659                            ),
660                        })?;
661                    // Insert after pos.
662                    self.column_metadatas
663                        .insert(pos + 1, add_column.column_metadata);
664                }
665            }
666            names.insert(column_name);
667            if semantic_type == SemanticType::Tag {
668                // For a new tag, we extend the primary key.
669                self.primary_key.push(column_id);
670            }
671        }
672
673        Ok(())
674    }
675
676    /// Drops columns from the metadata if exist.
677    fn drop_columns(&mut self, names: &[String]) {
678        let name_set: HashSet<_> = names.iter().collect();
679        self.column_metadatas
680            .retain(|col| !name_set.contains(&col.column_schema.name));
681    }
682
683    /// Changes columns type to the metadata if exist.
684    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) {
685        let mut change_type_map: HashMap<_, _> = columns
686            .into_iter()
687            .map(
688                |ModifyColumnType {
689                     column_name,
690                     target_type,
691                 }| (column_name, target_type),
692            )
693            .collect();
694
695        for column_meta in self.column_metadatas.iter_mut() {
696            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
697                column_meta.column_schema.data_type = target_type;
698            }
699        }
700    }
701
702    fn change_column_inverted_index_options(
703        &mut self,
704        column_name: String,
705        value: bool,
706    ) -> Result<()> {
707        for column_meta in self.column_metadatas.iter_mut() {
708            if column_meta.column_schema.name == column_name {
709                column_meta.column_schema.set_inverted_index(value)
710            }
711        }
712        Ok(())
713    }
714
715    fn change_column_fulltext_options(
716        &mut self,
717        column_name: String,
718        enable: bool,
719        options: Option<FulltextOptions>,
720    ) -> Result<()> {
721        for column_meta in self.column_metadatas.iter_mut() {
722            if column_meta.column_schema.name == column_name {
723                ensure!(
724                    column_meta.column_schema.data_type.is_string(),
725                    InvalidColumnOptionSnafu {
726                        column_name,
727                        msg: "FULLTEXT index only supports string type".to_string(),
728                    }
729                );
730
731                let current_fulltext_options = column_meta
732                    .column_schema
733                    .fulltext_options()
734                    .context(SetFulltextOptionsSnafu {
735                        column_name: column_name.clone(),
736                    })?;
737
738                if enable {
739                    ensure!(
740                        options.is_some(),
741                        InvalidColumnOptionSnafu {
742                            column_name,
743                            msg: "FULLTEXT index options must be provided",
744                        }
745                    );
746                    set_column_fulltext_options(
747                        column_meta,
748                        column_name,
749                        options.unwrap(),
750                        current_fulltext_options,
751                    )?;
752                } else {
753                    unset_column_fulltext_options(
754                        column_meta,
755                        column_name,
756                        current_fulltext_options,
757                    )?;
758                }
759                break;
760            }
761        }
762        Ok(())
763    }
764
765    fn change_column_skipping_index_options(
766        &mut self,
767        column_name: String,
768        options: Option<SkippingIndexOptions>,
769    ) -> Result<()> {
770        for column_meta in self.column_metadatas.iter_mut() {
771            if column_meta.column_schema.name == column_name {
772                if let Some(options) = &options {
773                    column_meta
774                        .column_schema
775                        .set_skipping_options(options)
776                        .context(UnsetSkippingIndexOptionsSnafu {
777                            column_name: column_name.clone(),
778                        })?;
779                } else {
780                    column_meta.column_schema.unset_skipping_options().context(
781                        UnsetSkippingIndexOptionsSnafu {
782                            column_name: column_name.clone(),
783                        },
784                    )?;
785                }
786            }
787        }
788        Ok(())
789    }
790}
791
792/// Fields skipped in serialization.
793struct SkippedFields {
794    /// Last schema.
795    schema: SchemaRef,
796    /// Id of the time index column.
797    time_index: ColumnId,
798    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
799    id_to_index: HashMap<ColumnId, usize>,
800}
801
802impl SkippedFields {
803    /// Constructs skipped fields from `column_metadatas`.
804    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
805        let column_schemas = column_metadatas
806            .iter()
807            .map(|column_metadata| column_metadata.column_schema.clone())
808            .collect();
809        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
810        let time_index = column_metadatas
811            .iter()
812            .find_map(|col| {
813                if col.semantic_type == SemanticType::Timestamp {
814                    Some(col.column_id)
815                } else {
816                    None
817                }
818            })
819            .context(InvalidMetaSnafu {
820                reason: "time index not found",
821            })?;
822        let id_to_index = column_metadatas
823            .iter()
824            .enumerate()
825            .map(|(idx, col)| (col.column_id, idx))
826            .collect();
827
828        Ok(SkippedFields {
829            schema,
830            time_index,
831            id_to_index,
832        })
833    }
834}
835
836#[derive(Snafu)]
837#[snafu(visibility(pub))]
838#[stack_trace_debug]
839pub enum MetadataError {
840    #[snafu(display("Invalid schema"))]
841    InvalidSchema {
842        source: datatypes::error::Error,
843        #[snafu(implicit)]
844        location: Location,
845    },
846
847    #[snafu(display("Invalid metadata, {}", reason))]
848    InvalidMeta {
849        reason: String,
850        #[snafu(implicit)]
851        location: Location,
852    },
853
854    #[snafu(display("Failed to ser/de json object"))]
855    SerdeJson {
856        #[snafu(implicit)]
857        location: Location,
858        #[snafu(source)]
859        error: serde_json::Error,
860    },
861
862    #[snafu(display("Invalid raw region request, err: {}", err))]
863    InvalidRawRegionRequest {
864        err: String,
865        #[snafu(implicit)]
866        location: Location,
867    },
868
869    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
870    InvalidRegionRequest {
871        region_id: RegionId,
872        err: String,
873        #[snafu(implicit)]
874        location: Location,
875    },
876
877    #[snafu(display("Unexpected schema error during project"))]
878    SchemaProject {
879        origin_schema: SchemaRef,
880        projection: Vec<ColumnId>,
881        #[snafu(implicit)]
882        location: Location,
883        source: datatypes::Error,
884    },
885
886    #[snafu(display("Time index column not found"))]
887    TimeIndexNotFound {
888        #[snafu(implicit)]
889        location: Location,
890    },
891
892    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
893    ChangeColumnNotFound {
894        column_name: String,
895        region_id: RegionId,
896        #[snafu(implicit)]
897        location: Location,
898    },
899
900    #[snafu(display("Failed to convert column schema"))]
901    ConvertColumnSchema {
902        source: api::error::Error,
903        #[snafu(implicit)]
904        location: Location,
905    },
906
907    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
908    InvalidSetRegionOptionRequest {
909        key: String,
910        value: String,
911        #[snafu(implicit)]
912        location: Location,
913    },
914
915    #[snafu(display("Invalid set region option request, key: {}", key))]
916    InvalidUnsetRegionOptionRequest {
917        key: String,
918        #[snafu(implicit)]
919        location: Location,
920    },
921
922    #[snafu(display("Failed to decode protobuf"))]
923    DecodeProto {
924        #[snafu(source)]
925        error: prost::UnknownEnumValue,
926        #[snafu(implicit)]
927        location: Location,
928    },
929
930    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
931    InvalidColumnOption {
932        column_name: String,
933        msg: String,
934        #[snafu(implicit)]
935        location: Location,
936    },
937
938    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
939    SetFulltextOptions {
940        column_name: String,
941        source: datatypes::Error,
942        #[snafu(implicit)]
943        location: Location,
944    },
945
946    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
947    SetSkippingIndexOptions {
948        column_name: String,
949        source: datatypes::Error,
950        #[snafu(implicit)]
951        location: Location,
952    },
953
954    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
955    UnsetSkippingIndexOptions {
956        column_name: String,
957        source: datatypes::Error,
958        #[snafu(implicit)]
959        location: Location,
960    },
961
962    #[snafu(display("Failed to decode arrow ipc record batches"))]
963    DecodeArrowIpc {
964        #[snafu(source)]
965        error: arrow::error::ArrowError,
966        #[snafu(implicit)]
967        location: Location,
968    },
969
970    #[snafu(display("Unexpected: {}", reason))]
971    Unexpected {
972        reason: String,
973        #[snafu(implicit)]
974        location: Location,
975    },
976}
977
978impl ErrorExt for MetadataError {
979    fn status_code(&self) -> StatusCode {
980        StatusCode::InvalidArguments
981    }
982
983    fn as_any(&self) -> &dyn Any {
984        self
985    }
986}
987
988/// Set column fulltext options if it passed the validation.
989///
990/// Options allowed to modify:
991/// * backend
992///
993/// Options not allowed to modify:
994/// * analyzer
995/// * case_sensitive
996fn set_column_fulltext_options(
997    column_meta: &mut ColumnMetadata,
998    column_name: String,
999    options: FulltextOptions,
1000    current_options: Option<FulltextOptions>,
1001) -> Result<()> {
1002    if let Some(current_options) = current_options {
1003        ensure!(
1004            current_options.analyzer == options.analyzer
1005                && current_options.case_sensitive == options.case_sensitive,
1006            InvalidColumnOptionSnafu {
1007                column_name,
1008                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1009                current_options.analyzer, current_options.case_sensitive),
1010            }
1011        );
1012    }
1013
1014    column_meta
1015        .column_schema
1016        .set_fulltext_options(&options)
1017        .context(SetFulltextOptionsSnafu { column_name })?;
1018
1019    Ok(())
1020}
1021
1022fn unset_column_fulltext_options(
1023    column_meta: &mut ColumnMetadata,
1024    column_name: String,
1025    current_options: Option<FulltextOptions>,
1026) -> Result<()> {
1027    if let Some(mut current_options) = current_options
1028        && current_options.enable
1029    {
1030        current_options.enable = false;
1031        column_meta
1032            .column_schema
1033            .set_fulltext_options(&current_options)
1034            .context(SetFulltextOptionsSnafu { column_name })?;
1035    } else {
1036        return InvalidColumnOptionSnafu {
1037            column_name,
1038            msg: "FULLTEXT index already disabled",
1039        }
1040        .fail();
1041    }
1042
1043    Ok(())
1044}
1045
1046#[cfg(test)]
1047mod test {
1048    use datatypes::prelude::ConcreteDataType;
1049    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1050
1051    use super::*;
1052
1053    fn create_builder() -> RegionMetadataBuilder {
1054        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1055    }
1056
1057    fn build_test_region_metadata() -> RegionMetadata {
1058        let mut builder = create_builder();
1059        builder
1060            .push_column_metadata(ColumnMetadata {
1061                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1062                semantic_type: SemanticType::Tag,
1063                column_id: 1,
1064            })
1065            .push_column_metadata(ColumnMetadata {
1066                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1067                semantic_type: SemanticType::Field,
1068                column_id: 2,
1069            })
1070            .push_column_metadata(ColumnMetadata {
1071                column_schema: ColumnSchema::new(
1072                    "c",
1073                    ConcreteDataType::timestamp_millisecond_datatype(),
1074                    false,
1075                ),
1076                semantic_type: SemanticType::Timestamp,
1077                column_id: 3,
1078            })
1079            .primary_key(vec![1]);
1080        builder.build().unwrap()
1081    }
1082
1083    #[test]
1084    fn test_region_metadata() {
1085        let region_metadata = build_test_region_metadata();
1086        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1087        assert_eq!(
1088            "a",
1089            region_metadata.column_by_id(1).unwrap().column_schema.name
1090        );
1091        assert_eq!(None, region_metadata.column_by_id(10));
1092    }
1093
1094    #[test]
1095    fn test_region_metadata_serde() {
1096        let region_metadata = build_test_region_metadata();
1097        let serialized = serde_json::to_string(&region_metadata).unwrap();
1098        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1099        assert_eq!(region_metadata, deserialized);
1100    }
1101
1102    #[test]
1103    fn test_column_metadata_validate() {
1104        let mut builder = create_builder();
1105        let col = ColumnMetadata {
1106            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1107            semantic_type: SemanticType::Timestamp,
1108            column_id: 1,
1109        };
1110
1111        builder.push_column_metadata(col);
1112        let err = builder.build().unwrap_err();
1113        assert!(
1114            err.to_string()
1115                .contains("column `ts` is not timestamp type"),
1116            "unexpected err: {err}",
1117        );
1118    }
1119
1120    #[test]
1121    fn test_empty_region_metadata() {
1122        let builder = create_builder();
1123        let err = builder.build().unwrap_err();
1124        // A region must have a time index.
1125        assert!(
1126            err.to_string().contains("time index not found"),
1127            "unexpected err: {err}",
1128        );
1129    }
1130
1131    #[test]
1132    fn test_same_column_id() {
1133        let mut builder = create_builder();
1134        builder
1135            .push_column_metadata(ColumnMetadata {
1136                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1137                semantic_type: SemanticType::Tag,
1138                column_id: 1,
1139            })
1140            .push_column_metadata(ColumnMetadata {
1141                column_schema: ColumnSchema::new(
1142                    "b",
1143                    ConcreteDataType::timestamp_millisecond_datatype(),
1144                    false,
1145                ),
1146                semantic_type: SemanticType::Timestamp,
1147                column_id: 1,
1148            });
1149        let err = builder.build().unwrap_err();
1150        assert!(
1151            err.to_string()
1152                .contains("column a and b have the same column id"),
1153            "unexpected err: {err}",
1154        );
1155    }
1156
1157    #[test]
1158    fn test_duplicate_time_index() {
1159        let mut builder = create_builder();
1160        builder
1161            .push_column_metadata(ColumnMetadata {
1162                column_schema: ColumnSchema::new(
1163                    "a",
1164                    ConcreteDataType::timestamp_millisecond_datatype(),
1165                    false,
1166                ),
1167                semantic_type: SemanticType::Timestamp,
1168                column_id: 1,
1169            })
1170            .push_column_metadata(ColumnMetadata {
1171                column_schema: ColumnSchema::new(
1172                    "b",
1173                    ConcreteDataType::timestamp_millisecond_datatype(),
1174                    false,
1175                ),
1176                semantic_type: SemanticType::Timestamp,
1177                column_id: 2,
1178            });
1179        let err = builder.build().unwrap_err();
1180        assert!(
1181            err.to_string().contains("expect only one time index"),
1182            "unexpected err: {err}",
1183        );
1184    }
1185
1186    #[test]
1187    fn test_unknown_primary_key() {
1188        let mut builder = create_builder();
1189        builder
1190            .push_column_metadata(ColumnMetadata {
1191                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1192                semantic_type: SemanticType::Tag,
1193                column_id: 1,
1194            })
1195            .push_column_metadata(ColumnMetadata {
1196                column_schema: ColumnSchema::new(
1197                    "b",
1198                    ConcreteDataType::timestamp_millisecond_datatype(),
1199                    false,
1200                ),
1201                semantic_type: SemanticType::Timestamp,
1202                column_id: 2,
1203            })
1204            .primary_key(vec![3]);
1205        let err = builder.build().unwrap_err();
1206        assert!(
1207            err.to_string().contains("unknown column id 3"),
1208            "unexpected err: {err}",
1209        );
1210    }
1211
1212    #[test]
1213    fn test_same_primary_key() {
1214        let mut builder = create_builder();
1215        builder
1216            .push_column_metadata(ColumnMetadata {
1217                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1218                semantic_type: SemanticType::Tag,
1219                column_id: 1,
1220            })
1221            .push_column_metadata(ColumnMetadata {
1222                column_schema: ColumnSchema::new(
1223                    "b",
1224                    ConcreteDataType::timestamp_millisecond_datatype(),
1225                    false,
1226                ),
1227                semantic_type: SemanticType::Timestamp,
1228                column_id: 2,
1229            })
1230            .primary_key(vec![1, 1]);
1231        let err = builder.build().unwrap_err();
1232        assert!(
1233            err.to_string()
1234                .contains("duplicate column a in primary key"),
1235            "unexpected err: {err}",
1236        );
1237    }
1238
1239    #[test]
1240    fn test_in_time_index() {
1241        let mut builder = create_builder();
1242        builder
1243            .push_column_metadata(ColumnMetadata {
1244                column_schema: ColumnSchema::new(
1245                    "ts",
1246                    ConcreteDataType::timestamp_millisecond_datatype(),
1247                    false,
1248                ),
1249                semantic_type: SemanticType::Timestamp,
1250                column_id: 1,
1251            })
1252            .primary_key(vec![1]);
1253        let err = builder.build().unwrap_err();
1254        assert!(
1255            err.to_string()
1256                .contains("column ts is already a time index column"),
1257            "unexpected err: {err}",
1258        );
1259    }
1260
1261    #[test]
1262    fn test_nullable_time_index() {
1263        let mut builder = create_builder();
1264        builder.push_column_metadata(ColumnMetadata {
1265            column_schema: ColumnSchema::new(
1266                "ts",
1267                ConcreteDataType::timestamp_millisecond_datatype(),
1268                true,
1269            ),
1270            semantic_type: SemanticType::Timestamp,
1271            column_id: 1,
1272        });
1273        let err = builder.build().unwrap_err();
1274        assert!(
1275            err.to_string()
1276                .contains("time index column ts must be NOT NULL"),
1277            "unexpected err: {err}",
1278        );
1279    }
1280
1281    #[test]
1282    fn test_primary_key_semantic_type() {
1283        let mut builder = create_builder();
1284        builder
1285            .push_column_metadata(ColumnMetadata {
1286                column_schema: ColumnSchema::new(
1287                    "ts",
1288                    ConcreteDataType::timestamp_millisecond_datatype(),
1289                    false,
1290                ),
1291                semantic_type: SemanticType::Timestamp,
1292                column_id: 1,
1293            })
1294            .push_column_metadata(ColumnMetadata {
1295                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1296                semantic_type: SemanticType::Field,
1297                column_id: 2,
1298            })
1299            .primary_key(vec![2]);
1300        let err = builder.build().unwrap_err();
1301        assert!(
1302            err.to_string()
1303                .contains("semantic type of column a should be Tag, not Field"),
1304            "unexpected err: {err}",
1305        );
1306    }
1307
1308    #[test]
1309    fn test_primary_key_tag_num() {
1310        let mut builder = create_builder();
1311        builder
1312            .push_column_metadata(ColumnMetadata {
1313                column_schema: ColumnSchema::new(
1314                    "ts",
1315                    ConcreteDataType::timestamp_millisecond_datatype(),
1316                    false,
1317                ),
1318                semantic_type: SemanticType::Timestamp,
1319                column_id: 1,
1320            })
1321            .push_column_metadata(ColumnMetadata {
1322                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1323                semantic_type: SemanticType::Tag,
1324                column_id: 2,
1325            })
1326            .push_column_metadata(ColumnMetadata {
1327                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1328                semantic_type: SemanticType::Tag,
1329                column_id: 3,
1330            })
1331            .primary_key(vec![2]);
1332        let err = builder.build().unwrap_err();
1333        assert!(
1334            err.to_string()
1335                .contains("number of primary key columns 1 not equal to tag columns 2"),
1336            "unexpected err: {err}",
1337        );
1338    }
1339
1340    #[test]
1341    fn test_bump_version() {
1342        let mut region_metadata = build_test_region_metadata();
1343        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1344        builder.bump_version();
1345        let new_meta = builder.build().unwrap();
1346        region_metadata.schema_version += 1;
1347        assert_eq!(region_metadata, new_meta);
1348    }
1349
1350    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1351        let semantic_type = if is_tag {
1352            SemanticType::Tag
1353        } else {
1354            SemanticType::Field
1355        };
1356        ColumnMetadata {
1357            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1358            semantic_type,
1359            column_id,
1360        }
1361    }
1362
1363    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1364        let actual: Vec<_> = metadata
1365            .column_metadatas
1366            .iter()
1367            .map(|col| &col.column_schema.name)
1368            .collect();
1369        assert_eq!(names, actual);
1370    }
1371
1372    #[test]
1373    fn test_alter() {
1374        // a (tag), b (field), c (ts)
1375        let metadata = build_test_region_metadata();
1376        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1377        // tag d
1378        builder
1379            .alter(AlterKind::AddColumns {
1380                columns: vec![AddColumn {
1381                    column_metadata: new_column_metadata("d", true, 4),
1382                    location: None,
1383                }],
1384            })
1385            .unwrap();
1386        let metadata = builder.build().unwrap();
1387        check_columns(&metadata, &["a", "b", "c", "d"]);
1388        assert_eq!([1, 4], &metadata.primary_key[..]);
1389
1390        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1391        builder
1392            .alter(AlterKind::AddColumns {
1393                columns: vec![AddColumn {
1394                    column_metadata: new_column_metadata("e", false, 5),
1395                    location: Some(AddColumnLocation::First),
1396                }],
1397            })
1398            .unwrap();
1399        let metadata = builder.build().unwrap();
1400        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1401
1402        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1403        builder
1404            .alter(AlterKind::AddColumns {
1405                columns: vec![AddColumn {
1406                    column_metadata: new_column_metadata("f", false, 6),
1407                    location: Some(AddColumnLocation::After {
1408                        column_name: "b".to_string(),
1409                    }),
1410                }],
1411            })
1412            .unwrap();
1413        let metadata = builder.build().unwrap();
1414        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1415
1416        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1417        builder
1418            .alter(AlterKind::AddColumns {
1419                columns: vec![AddColumn {
1420                    column_metadata: new_column_metadata("g", false, 7),
1421                    location: Some(AddColumnLocation::After {
1422                        column_name: "d".to_string(),
1423                    }),
1424                }],
1425            })
1426            .unwrap();
1427        let metadata = builder.build().unwrap();
1428        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1429
1430        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1431        builder
1432            .alter(AlterKind::DropColumns {
1433                names: vec!["g".to_string(), "e".to_string()],
1434            })
1435            .unwrap();
1436        let metadata = builder.build().unwrap();
1437        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1438
1439        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1440        builder
1441            .alter(AlterKind::DropColumns {
1442                names: vec!["a".to_string()],
1443            })
1444            .unwrap();
1445        // Build returns error as the primary key contains a.
1446        let err = builder.build().unwrap_err();
1447        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1448
1449        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1450        builder
1451            .alter(AlterKind::ModifyColumnTypes {
1452                columns: vec![ModifyColumnType {
1453                    column_name: "b".to_string(),
1454                    target_type: ConcreteDataType::string_datatype(),
1455                }],
1456            })
1457            .unwrap();
1458        let metadata = builder.build().unwrap();
1459        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1460        let b_type = &metadata
1461            .column_by_name("b")
1462            .unwrap()
1463            .column_schema
1464            .data_type;
1465        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1466
1467        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1468        builder
1469            .alter(AlterKind::SetIndex {
1470                options: ApiSetIndexOptions::Fulltext {
1471                    column_name: "b".to_string(),
1472                    options: FulltextOptions {
1473                        enable: true,
1474                        analyzer: FulltextAnalyzer::Chinese,
1475                        case_sensitive: true,
1476                        backend: FulltextBackend::Bloom,
1477                    },
1478                },
1479            })
1480            .unwrap();
1481        let metadata = builder.build().unwrap();
1482        let a_fulltext_options = metadata
1483            .column_by_name("b")
1484            .unwrap()
1485            .column_schema
1486            .fulltext_options()
1487            .unwrap()
1488            .unwrap();
1489        assert!(a_fulltext_options.enable);
1490        assert_eq!(
1491            datatypes::schema::FulltextAnalyzer::Chinese,
1492            a_fulltext_options.analyzer
1493        );
1494        assert!(a_fulltext_options.case_sensitive);
1495
1496        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1497        builder
1498            .alter(AlterKind::UnsetIndex {
1499                options: ApiUnsetIndexOptions::Fulltext {
1500                    column_name: "b".to_string(),
1501                },
1502            })
1503            .unwrap();
1504        let metadata = builder.build().unwrap();
1505        let a_fulltext_options = metadata
1506            .column_by_name("b")
1507            .unwrap()
1508            .column_schema
1509            .fulltext_options()
1510            .unwrap()
1511            .unwrap();
1512        assert!(!a_fulltext_options.enable);
1513        assert_eq!(
1514            datatypes::schema::FulltextAnalyzer::Chinese,
1515            a_fulltext_options.analyzer
1516        );
1517        assert!(a_fulltext_options.case_sensitive);
1518    }
1519
1520    #[test]
1521    fn test_add_if_not_exists() {
1522        // a (tag), b (field), c (ts)
1523        let metadata = build_test_region_metadata();
1524        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1525        // tag d
1526        builder
1527            .alter(AlterKind::AddColumns {
1528                columns: vec![
1529                    AddColumn {
1530                        column_metadata: new_column_metadata("d", true, 4),
1531                        location: None,
1532                    },
1533                    AddColumn {
1534                        column_metadata: new_column_metadata("d", true, 4),
1535                        location: None,
1536                    },
1537                ],
1538            })
1539            .unwrap();
1540        let metadata = builder.build().unwrap();
1541        check_columns(&metadata, &["a", "b", "c", "d"]);
1542        assert_eq!([1, 4], &metadata.primary_key[..]);
1543
1544        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1545        // field b.
1546        builder
1547            .alter(AlterKind::AddColumns {
1548                columns: vec![AddColumn {
1549                    column_metadata: new_column_metadata("b", false, 2),
1550                    location: None,
1551                }],
1552            })
1553            .unwrap();
1554        let metadata = builder.build().unwrap();
1555        check_columns(&metadata, &["a", "b", "c", "d"]);
1556    }
1557
1558    #[test]
1559    fn test_add_column_with_inverted_index() {
1560        // only set inverted index to true explicitly will this column be inverted indexed
1561
1562        // a (tag), b (field), c (ts)
1563        let metadata = build_test_region_metadata();
1564        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1565        // tag d, e
1566        let mut col = new_column_metadata("d", true, 4);
1567        col.column_schema.set_inverted_index(true);
1568        builder
1569            .alter(AlterKind::AddColumns {
1570                columns: vec![
1571                    AddColumn {
1572                        column_metadata: col,
1573                        location: None,
1574                    },
1575                    AddColumn {
1576                        column_metadata: new_column_metadata("e", true, 5),
1577                        location: None,
1578                    },
1579                ],
1580            })
1581            .unwrap();
1582        let metadata = builder.build().unwrap();
1583        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1584        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1585        let column_metadata = metadata.column_by_name("a").unwrap();
1586        assert!(!column_metadata.column_schema.is_inverted_indexed());
1587        let column_metadata = metadata.column_by_name("b").unwrap();
1588        assert!(!column_metadata.column_schema.is_inverted_indexed());
1589        let column_metadata = metadata.column_by_name("c").unwrap();
1590        assert!(!column_metadata.column_schema.is_inverted_indexed());
1591        let column_metadata = metadata.column_by_name("d").unwrap();
1592        assert!(column_metadata.column_schema.is_inverted_indexed());
1593        let column_metadata = metadata.column_by_name("e").unwrap();
1594        assert!(!column_metadata.column_schema.is_inverted_indexed());
1595    }
1596
1597    #[test]
1598    fn test_drop_if_exists() {
1599        // a (tag), b (field), c (ts)
1600        let metadata = build_test_region_metadata();
1601        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1602        // field d, e
1603        builder
1604            .alter(AlterKind::AddColumns {
1605                columns: vec![
1606                    AddColumn {
1607                        column_metadata: new_column_metadata("d", false, 4),
1608                        location: None,
1609                    },
1610                    AddColumn {
1611                        column_metadata: new_column_metadata("e", false, 5),
1612                        location: None,
1613                    },
1614                ],
1615            })
1616            .unwrap();
1617        let metadata = builder.build().unwrap();
1618        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1619
1620        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1621        builder
1622            .alter(AlterKind::DropColumns {
1623                names: vec!["b".to_string(), "b".to_string()],
1624            })
1625            .unwrap();
1626        let metadata = builder.build().unwrap();
1627        check_columns(&metadata, &["a", "c", "d", "e"]);
1628
1629        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1630        builder
1631            .alter(AlterKind::DropColumns {
1632                names: vec!["b".to_string(), "e".to_string()],
1633            })
1634            .unwrap();
1635        let metadata = builder.build().unwrap();
1636        check_columns(&metadata, &["a", "c", "d"]);
1637    }
1638
1639    #[test]
1640    fn test_invalid_column_name() {
1641        let mut builder = create_builder();
1642        builder.push_column_metadata(ColumnMetadata {
1643            column_schema: ColumnSchema::new(
1644                "__sequence",
1645                ConcreteDataType::timestamp_millisecond_datatype(),
1646                false,
1647            ),
1648            semantic_type: SemanticType::Timestamp,
1649            column_id: 1,
1650        });
1651        let err = builder.build().unwrap_err();
1652        assert!(
1653            err.to_string()
1654                .contains("internal column name that can not be used"),
1655            "unexpected err: {err}",
1656        );
1657    }
1658
1659    #[test]
1660    fn test_debug_for_column_metadata() {
1661        let region_metadata = build_test_region_metadata();
1662        let formatted = format!("{:?}", region_metadata);
1663        assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1664    }
1665
1666    #[test]
1667    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1668        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1669        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1670        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1671
1672        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1673        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1674        assert_eq!(
1675            deserialized.primary_key_encoding,
1676            PrimaryKeyEncoding::Sparse
1677        );
1678    }
1679}