store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, SkippingIndexOptions};
33use datatypes::types::TimestampType;
34use serde::de::Error;
35use serde::{Deserialize, Deserializer, Serialize};
36use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
37
38use crate::codec::PrimaryKeyEncoding;
39use crate::region_request::{
40    AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, ApiUnsetIndexOptions,
41    ModifyColumnType,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157}
158
159impl fmt::Debug for RegionMetadata {
160    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
161        f.debug_struct("RegionMetadata")
162            .field("column_metadatas", &self.column_metadatas)
163            .field("time_index", &self.time_index)
164            .field("primary_key", &self.primary_key)
165            .field("region_id", &self.region_id)
166            .field("schema_version", &self.schema_version)
167            .finish()
168    }
169}
170
171pub type RegionMetadataRef = Arc<RegionMetadata>;
172
173impl<'de> Deserialize<'de> for RegionMetadata {
174    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
175    where
176        D: Deserializer<'de>,
177    {
178        // helper internal struct for deserialization
179        #[derive(Deserialize)]
180        struct RegionMetadataWithoutSchema {
181            column_metadatas: Vec<ColumnMetadata>,
182            primary_key: Vec<ColumnId>,
183            region_id: RegionId,
184            schema_version: u64,
185            #[serde(default)]
186            primary_key_encoding: PrimaryKeyEncoding,
187        }
188
189        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
190        let skipped =
191            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
192
193        Ok(Self {
194            schema: skipped.schema,
195            time_index: skipped.time_index,
196            id_to_index: skipped.id_to_index,
197            column_metadatas: without_schema.column_metadatas,
198            primary_key: without_schema.primary_key,
199            region_id: without_schema.region_id,
200            schema_version: without_schema.schema_version,
201            primary_key_encoding: without_schema.primary_key_encoding,
202        })
203    }
204}
205
206impl RegionMetadata {
207    /// Decode the metadata from a JSON str.
208    pub fn from_json(s: &str) -> Result<Self> {
209        serde_json::from_str(s).context(SerdeJsonSnafu)
210    }
211
212    /// Encode the metadata to a JSON string.
213    pub fn to_json(&self) -> Result<String> {
214        serde_json::to_string(&self).context(SerdeJsonSnafu)
215    }
216
217    /// Find column by id.
218    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
219        self.id_to_index
220            .get(&column_id)
221            .map(|index| &self.column_metadatas[*index])
222    }
223
224    /// Find column index by id.
225    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
226        self.id_to_index.get(&column_id).copied()
227    }
228
229    /// Find column index by name.
230    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
231        self.column_metadatas
232            .iter()
233            .position(|col| col.column_schema.name == column_name)
234    }
235
236    /// Returns the time index column
237    ///
238    /// # Panics
239    /// Panics if the time index column id is invalid.
240    pub fn time_index_column(&self) -> &ColumnMetadata {
241        let index = self.id_to_index[&self.time_index];
242        &self.column_metadatas[index]
243    }
244
245    /// Returns timestamp type of time index column
246    ///
247    /// # Panics
248    /// Panics if the time index column id is invalid.
249    pub fn time_index_type(&self) -> TimestampType {
250        let index = self.id_to_index[&self.time_index];
251        self.column_metadatas[index]
252            .column_schema
253            .data_type
254            .as_timestamp()
255            .unwrap()
256    }
257
258    /// Returns the position of the time index.
259    pub fn time_index_column_pos(&self) -> usize {
260        self.id_to_index[&self.time_index]
261    }
262
263    /// Returns the arrow field of the time index column.
264    pub fn time_index_field(&self) -> FieldRef {
265        let index = self.id_to_index[&self.time_index];
266        self.schema.arrow_schema().fields[index].clone()
267    }
268
269    /// Finds a column by name.
270    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
271        self.schema
272            .column_index_by_name(name)
273            .map(|index| &self.column_metadatas[index])
274    }
275
276    /// Returns all primary key columns.
277    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
278        // safety: RegionMetadata::validate ensures every primary key exists.
279        self.primary_key
280            .iter()
281            .map(|id| self.column_by_id(*id).unwrap())
282    }
283
284    /// Returns all field columns before projection.
285    ///
286    /// **Use with caution**. On read path where might have projection, this method
287    /// can return columns that not present in data batch.
288    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
289        self.column_metadatas
290            .iter()
291            .filter(|column| column.semantic_type == SemanticType::Field)
292    }
293
294    /// Returns a column's index in primary key if it is a primary key column.
295    ///
296    /// This does a linear search.
297    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
298        self.primary_key.iter().position(|id| *id == column_id)
299    }
300
301    /// Project the metadata to a new one using specified column ids.
302    ///
303    /// [RegionId] and schema version are preserved.
304    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
305        // check time index
306        ensure!(
307            projection.contains(&self.time_index),
308            TimeIndexNotFoundSnafu
309        );
310
311        // prepare new indices
312        let indices_to_preserve = projection
313            .iter()
314            .map(|id| {
315                self.column_index_by_id(*id)
316                    .with_context(|| InvalidRegionRequestSnafu {
317                        region_id: self.region_id,
318                        err: format!("column id {} not found", id),
319                    })
320            })
321            .collect::<Result<Vec<_>>>()?;
322
323        // project schema
324        let projected_schema =
325            self.schema
326                .try_project(&indices_to_preserve)
327                .with_context(|_| SchemaProjectSnafu {
328                    origin_schema: self.schema.clone(),
329                    projection: projection.to_vec(),
330                })?;
331
332        // project columns, generate projected primary key and new id_to_index
333        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
334        let mut projected_primary_key = vec![];
335        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
336        for index in indices_to_preserve {
337            let col = self.column_metadatas[index].clone();
338            if col.semantic_type == SemanticType::Tag {
339                projected_primary_key.push(col.column_id);
340            }
341            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
342            projected_column_metadatas.push(col);
343        }
344
345        Ok(RegionMetadata {
346            schema: Arc::new(projected_schema),
347            time_index: self.time_index,
348            id_to_index: projected_id_to_index,
349            column_metadatas: projected_column_metadatas,
350            primary_key: projected_primary_key,
351            region_id: self.region_id,
352            schema_version: self.schema_version,
353            primary_key_encoding: self.primary_key_encoding,
354        })
355    }
356
357    /// Gets the column ids to be indexed by inverted index.
358    pub fn inverted_indexed_column_ids<'a>(
359        &self,
360        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
361    ) -> HashSet<ColumnId> {
362        let mut inverted_index = self
363            .column_metadatas
364            .iter()
365            .filter(|column| column.column_schema.is_inverted_indexed())
366            .map(|column| column.column_id)
367            .collect::<HashSet<_>>();
368
369        for ignored in ignore_column_ids {
370            inverted_index.remove(ignored);
371        }
372
373        inverted_index
374    }
375
376    /// Checks whether the metadata is valid.
377    fn validate(&self) -> Result<()> {
378        // Id to name.
379        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
380        for col in &self.column_metadatas {
381            // Validate each column.
382            Self::validate_column_metadata(col)?;
383
384            // Check whether column id is duplicated. We already check column name
385            // is unique in `Schema` so we only check column id here.
386            ensure!(
387                !id_names.contains_key(&col.column_id),
388                InvalidMetaSnafu {
389                    reason: format!(
390                        "column {} and {} have the same column id {}",
391                        id_names[&col.column_id], col.column_schema.name, col.column_id,
392                    ),
393                }
394            );
395            id_names.insert(col.column_id, &col.column_schema.name);
396        }
397
398        // Checks there is only one time index.
399        let num_time_index = self
400            .column_metadatas
401            .iter()
402            .filter(|col| col.semantic_type == SemanticType::Timestamp)
403            .count();
404        ensure!(
405            num_time_index == 1,
406            InvalidMetaSnafu {
407                reason: format!("expect only one time index, found {}", num_time_index),
408            }
409        );
410
411        // Checks the time index column is not nullable.
412        ensure!(
413            !self.time_index_column().column_schema.is_nullable(),
414            InvalidMetaSnafu {
415                reason: format!(
416                    "time index column {} must be NOT NULL",
417                    self.time_index_column().column_schema.name
418                ),
419            }
420        );
421
422        if !self.primary_key.is_empty() {
423            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
424            // Checks column ids in the primary key is valid.
425            for column_id in &self.primary_key {
426                // Checks whether the column id exists.
427                ensure!(
428                    id_names.contains_key(column_id),
429                    InvalidMetaSnafu {
430                        reason: format!("unknown column id {}", column_id),
431                    }
432                );
433
434                // Safety: Column with specific id must exist.
435                let column = self.column_by_id(*column_id).unwrap();
436                // Checks duplicate.
437                ensure!(
438                    !pk_ids.contains(&column_id),
439                    InvalidMetaSnafu {
440                        reason: format!(
441                            "duplicate column {} in primary key",
442                            column.column_schema.name
443                        ),
444                    }
445                );
446
447                // Checks this is not a time index column.
448                ensure!(
449                    *column_id != self.time_index,
450                    InvalidMetaSnafu {
451                        reason: format!(
452                            "column {} is already a time index column",
453                            column.column_schema.name,
454                        ),
455                    }
456                );
457
458                // Checks semantic type.
459                ensure!(
460                    column.semantic_type == SemanticType::Tag,
461                    InvalidMetaSnafu {
462                        reason: format!(
463                            "semantic type of column {} should be Tag, not {:?}",
464                            column.column_schema.name, column.semantic_type
465                        ),
466                    }
467                );
468
469                pk_ids.insert(column_id);
470            }
471        }
472
473        // Checks tag semantic type.
474        let num_tag = self
475            .column_metadatas
476            .iter()
477            .filter(|col| col.semantic_type == SemanticType::Tag)
478            .count();
479        ensure!(
480            num_tag == self.primary_key.len(),
481            InvalidMetaSnafu {
482                reason: format!(
483                    "number of primary key columns {} not equal to tag columns {}",
484                    self.primary_key.len(),
485                    num_tag
486                ),
487            }
488        );
489
490        Ok(())
491    }
492
493    /// Checks whether it is a valid column.
494    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
495        if column_metadata.semantic_type == SemanticType::Timestamp {
496            ensure!(
497                column_metadata.column_schema.data_type.is_timestamp(),
498                InvalidMetaSnafu {
499                    reason: format!(
500                        "column `{}` is not timestamp type",
501                        column_metadata.column_schema.name
502                    ),
503                }
504            );
505        }
506
507        ensure!(
508            !is_internal_column(&column_metadata.column_schema.name),
509            InvalidMetaSnafu {
510                reason: format!(
511                    "{} is internal column name that can not be used",
512                    column_metadata.column_schema.name
513                ),
514            }
515        );
516
517        Ok(())
518    }
519}
520
521/// Builder to build [RegionMetadata].
522pub struct RegionMetadataBuilder {
523    region_id: RegionId,
524    column_metadatas: Vec<ColumnMetadata>,
525    primary_key: Vec<ColumnId>,
526    schema_version: u64,
527    primary_key_encoding: PrimaryKeyEncoding,
528}
529
530impl RegionMetadataBuilder {
531    /// Returns a new builder.
532    pub fn new(id: RegionId) -> Self {
533        Self {
534            region_id: id,
535            column_metadatas: vec![],
536            primary_key: vec![],
537            schema_version: 0,
538            primary_key_encoding: PrimaryKeyEncoding::Dense,
539        }
540    }
541
542    /// Creates a builder from existing [RegionMetadata].
543    pub fn from_existing(existing: RegionMetadata) -> Self {
544        Self {
545            column_metadatas: existing.column_metadatas,
546            primary_key: existing.primary_key,
547            region_id: existing.region_id,
548            schema_version: existing.schema_version,
549            primary_key_encoding: existing.primary_key_encoding,
550        }
551    }
552
553    /// Sets the primary key encoding mode.
554    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
555        self.primary_key_encoding = encoding;
556        self
557    }
558
559    /// Pushes a new column metadata to this region's metadata.
560    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
561        self.column_metadatas.push(column_metadata);
562        self
563    }
564
565    /// Sets the primary key of the region.
566    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
567        self.primary_key = key;
568        self
569    }
570
571    /// Increases the schema version by 1.
572    pub fn bump_version(&mut self) -> &mut Self {
573        self.schema_version += 1;
574        self
575    }
576
577    /// Applies the alter `kind` to the builder.
578    ///
579    /// The `kind` should be valid.
580    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
581        match kind {
582            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
583            AlterKind::DropColumns { names } => self.drop_columns(&names),
584            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
585            AlterKind::SetIndex { options } => match options {
586                ApiSetIndexOptions::Fulltext {
587                    column_name,
588                    options,
589                } => self.change_column_fulltext_options(column_name, true, Some(options))?,
590                ApiSetIndexOptions::Inverted { column_name } => {
591                    self.change_column_inverted_index_options(column_name, true)?
592                }
593                ApiSetIndexOptions::Skipping {
594                    column_name,
595                    options,
596                } => self.change_column_skipping_index_options(column_name, Some(options))?,
597            },
598            AlterKind::UnsetIndex { options } => match options {
599                ApiUnsetIndexOptions::Fulltext { column_name } => {
600                    self.change_column_fulltext_options(column_name, false, None)?
601                }
602                ApiUnsetIndexOptions::Inverted { column_name } => {
603                    self.change_column_inverted_index_options(column_name, false)?
604                }
605                ApiUnsetIndexOptions::Skipping { column_name } => {
606                    self.change_column_skipping_index_options(column_name, None)?
607                }
608            },
609            AlterKind::SetRegionOptions { options: _ } => {
610                // nothing to be done with RegionMetadata
611            }
612            AlterKind::UnsetRegionOptions { keys: _ } => {
613                // nothing to be done with RegionMetadata
614            }
615            AlterKind::DropDefaults { names } => {
616                self.drop_defaults(names)?;
617            }
618        }
619        Ok(self)
620    }
621
622    /// Consumes the builder and build a [RegionMetadata].
623    pub fn build(self) -> Result<RegionMetadata> {
624        let skipped = SkippedFields::new(&self.column_metadatas)?;
625
626        let meta = RegionMetadata {
627            schema: skipped.schema,
628            time_index: skipped.time_index,
629            id_to_index: skipped.id_to_index,
630            column_metadatas: self.column_metadatas,
631            primary_key: self.primary_key,
632            region_id: self.region_id,
633            schema_version: self.schema_version,
634            primary_key_encoding: self.primary_key_encoding,
635        };
636
637        meta.validate()?;
638
639        Ok(meta)
640    }
641
642    /// Adds columns to the metadata if not exist.
643    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
644        let mut names: HashSet<_> = self
645            .column_metadatas
646            .iter()
647            .map(|col| col.column_schema.name.clone())
648            .collect();
649
650        for add_column in columns {
651            if names.contains(&add_column.column_metadata.column_schema.name) {
652                // Column already exists.
653                continue;
654            }
655
656            let column_id = add_column.column_metadata.column_id;
657            let semantic_type = add_column.column_metadata.semantic_type;
658            let column_name = add_column.column_metadata.column_schema.name.clone();
659            match add_column.location {
660                None => {
661                    self.column_metadatas.push(add_column.column_metadata);
662                }
663                Some(AddColumnLocation::First) => {
664                    self.column_metadatas.insert(0, add_column.column_metadata);
665                }
666                Some(AddColumnLocation::After { column_name }) => {
667                    let pos = self
668                        .column_metadatas
669                        .iter()
670                        .position(|col| col.column_schema.name == column_name)
671                        .context(InvalidRegionRequestSnafu {
672                            region_id: self.region_id,
673                            err: format!(
674                                "column {} not found, failed to add column {} after it",
675                                column_name, add_column.column_metadata.column_schema.name
676                            ),
677                        })?;
678                    // Insert after pos.
679                    self.column_metadatas
680                        .insert(pos + 1, add_column.column_metadata);
681                }
682            }
683            names.insert(column_name);
684            if semantic_type == SemanticType::Tag {
685                // For a new tag, we extend the primary key.
686                self.primary_key.push(column_id);
687            }
688        }
689
690        Ok(())
691    }
692
693    /// Drops columns from the metadata if exist.
694    fn drop_columns(&mut self, names: &[String]) {
695        let name_set: HashSet<_> = names.iter().collect();
696        self.column_metadatas
697            .retain(|col| !name_set.contains(&col.column_schema.name));
698    }
699
700    /// Changes columns type to the metadata if exist.
701    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
702        let mut change_type_map: HashMap<_, _> = columns
703            .into_iter()
704            .map(
705                |ModifyColumnType {
706                     column_name,
707                     target_type,
708                 }| (column_name, target_type),
709            )
710            .collect();
711
712        for column_meta in self.column_metadatas.iter_mut() {
713            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
714                column_meta.column_schema.data_type = target_type.clone();
715                // also cast default value to target_type if default value exist
716                let new_default =
717                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
718                        Some(
719                            default_value
720                                .cast_to_datatype(&target_type)
721                                .with_context(|_| CastDefaultValueSnafu {
722                                    reason: format!(
723                                        "Failed to cast default value from {:?} to type {:?}",
724                                        default_value, target_type
725                                    ),
726                                })?,
727                        )
728                    } else {
729                        None
730                    };
731                column_meta.column_schema = column_meta
732                    .column_schema
733                    .clone()
734                    .with_default_constraint(new_default.clone())
735                    .with_context(|_| CastDefaultValueSnafu {
736                        reason: format!("Failed to set new default: {:?}", new_default),
737                    })?;
738            }
739        }
740
741        Ok(())
742    }
743
744    fn change_column_inverted_index_options(
745        &mut self,
746        column_name: String,
747        value: bool,
748    ) -> Result<()> {
749        for column_meta in self.column_metadatas.iter_mut() {
750            if column_meta.column_schema.name == column_name {
751                column_meta.column_schema.set_inverted_index(value)
752            }
753        }
754        Ok(())
755    }
756
757    fn change_column_fulltext_options(
758        &mut self,
759        column_name: String,
760        enable: bool,
761        options: Option<FulltextOptions>,
762    ) -> Result<()> {
763        for column_meta in self.column_metadatas.iter_mut() {
764            if column_meta.column_schema.name == column_name {
765                ensure!(
766                    column_meta.column_schema.data_type.is_string(),
767                    InvalidColumnOptionSnafu {
768                        column_name,
769                        msg: "FULLTEXT index only supports string type".to_string(),
770                    }
771                );
772
773                let current_fulltext_options = column_meta
774                    .column_schema
775                    .fulltext_options()
776                    .context(SetFulltextOptionsSnafu {
777                        column_name: column_name.clone(),
778                    })?;
779
780                if enable {
781                    ensure!(
782                        options.is_some(),
783                        InvalidColumnOptionSnafu {
784                            column_name,
785                            msg: "FULLTEXT index options must be provided",
786                        }
787                    );
788                    set_column_fulltext_options(
789                        column_meta,
790                        column_name,
791                        options.unwrap(),
792                        current_fulltext_options,
793                    )?;
794                } else {
795                    unset_column_fulltext_options(
796                        column_meta,
797                        column_name,
798                        current_fulltext_options,
799                    )?;
800                }
801                break;
802            }
803        }
804        Ok(())
805    }
806
807    fn change_column_skipping_index_options(
808        &mut self,
809        column_name: String,
810        options: Option<SkippingIndexOptions>,
811    ) -> Result<()> {
812        for column_meta in self.column_metadatas.iter_mut() {
813            if column_meta.column_schema.name == column_name {
814                if let Some(options) = &options {
815                    column_meta
816                        .column_schema
817                        .set_skipping_options(options)
818                        .context(UnsetSkippingIndexOptionsSnafu {
819                            column_name: column_name.clone(),
820                        })?;
821                } else {
822                    column_meta.column_schema.unset_skipping_options().context(
823                        UnsetSkippingIndexOptionsSnafu {
824                            column_name: column_name.clone(),
825                        },
826                    )?;
827                }
828            }
829        }
830        Ok(())
831    }
832
833    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
834        for name in column_names.iter() {
835            let meta = self
836                .column_metadatas
837                .iter_mut()
838                .find(|col| col.column_schema.name == *name);
839            if let Some(meta) = meta {
840                if !meta.column_schema.is_nullable() {
841                    return InvalidRegionRequestSnafu {
842                        region_id: self.region_id,
843                        err: format!(
844                            "column {name} is not nullable and `default` cannot be dropped",
845                        ),
846                    }
847                    .fail();
848                }
849                meta.column_schema = meta
850                    .column_schema
851                    .clone()
852                    .with_default_constraint(None)
853                    .with_context(|_| CastDefaultValueSnafu {
854                        reason: format!("Failed to drop default : {name:?}"),
855                    })?;
856            } else {
857                return InvalidRegionRequestSnafu {
858                    region_id: self.region_id,
859                    err: format!("column {name} not found",),
860                }
861                .fail();
862            }
863        }
864        Ok(())
865    }
866}
867
868/// Fields skipped in serialization.
869struct SkippedFields {
870    /// Last schema.
871    schema: SchemaRef,
872    /// Id of the time index column.
873    time_index: ColumnId,
874    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
875    id_to_index: HashMap<ColumnId, usize>,
876}
877
878impl SkippedFields {
879    /// Constructs skipped fields from `column_metadatas`.
880    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
881        let column_schemas = column_metadatas
882            .iter()
883            .map(|column_metadata| column_metadata.column_schema.clone())
884            .collect();
885        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
886        let time_index = column_metadatas
887            .iter()
888            .find_map(|col| {
889                if col.semantic_type == SemanticType::Timestamp {
890                    Some(col.column_id)
891                } else {
892                    None
893                }
894            })
895            .context(InvalidMetaSnafu {
896                reason: "time index not found",
897            })?;
898        let id_to_index = column_metadatas
899            .iter()
900            .enumerate()
901            .map(|(idx, col)| (col.column_id, idx))
902            .collect();
903
904        Ok(SkippedFields {
905            schema,
906            time_index,
907            id_to_index,
908        })
909    }
910}
911
912#[derive(Snafu)]
913#[snafu(visibility(pub))]
914#[stack_trace_debug]
915pub enum MetadataError {
916    #[snafu(display("Invalid schema"))]
917    InvalidSchema {
918        source: datatypes::error::Error,
919        #[snafu(implicit)]
920        location: Location,
921    },
922
923    #[snafu(display("Invalid metadata, {}", reason))]
924    InvalidMeta {
925        reason: String,
926        #[snafu(implicit)]
927        location: Location,
928    },
929
930    #[snafu(display("Failed to ser/de json object"))]
931    SerdeJson {
932        #[snafu(implicit)]
933        location: Location,
934        #[snafu(source)]
935        error: serde_json::Error,
936    },
937
938    #[snafu(display("Invalid raw region request, err: {}", err))]
939    InvalidRawRegionRequest {
940        err: String,
941        #[snafu(implicit)]
942        location: Location,
943    },
944
945    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
946    InvalidRegionRequest {
947        region_id: RegionId,
948        err: String,
949        #[snafu(implicit)]
950        location: Location,
951    },
952
953    #[snafu(display("Unexpected schema error during project"))]
954    SchemaProject {
955        origin_schema: SchemaRef,
956        projection: Vec<ColumnId>,
957        #[snafu(implicit)]
958        location: Location,
959        source: datatypes::Error,
960    },
961
962    #[snafu(display("Time index column not found"))]
963    TimeIndexNotFound {
964        #[snafu(implicit)]
965        location: Location,
966    },
967
968    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
969    ChangeColumnNotFound {
970        column_name: String,
971        region_id: RegionId,
972        #[snafu(implicit)]
973        location: Location,
974    },
975
976    #[snafu(display("Failed to convert column schema"))]
977    ConvertColumnSchema {
978        source: api::error::Error,
979        #[snafu(implicit)]
980        location: Location,
981    },
982
983    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
984    InvalidSetRegionOptionRequest {
985        key: String,
986        value: String,
987        #[snafu(implicit)]
988        location: Location,
989    },
990
991    #[snafu(display("Invalid set region option request, key: {}", key))]
992    InvalidUnsetRegionOptionRequest {
993        key: String,
994        #[snafu(implicit)]
995        location: Location,
996    },
997
998    #[snafu(display("Failed to decode protobuf"))]
999    DecodeProto {
1000        #[snafu(source)]
1001        error: prost::UnknownEnumValue,
1002        #[snafu(implicit)]
1003        location: Location,
1004    },
1005
1006    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1007    InvalidColumnOption {
1008        column_name: String,
1009        msg: String,
1010        #[snafu(implicit)]
1011        location: Location,
1012    },
1013
1014    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1015    SetFulltextOptions {
1016        column_name: String,
1017        source: datatypes::Error,
1018        #[snafu(implicit)]
1019        location: Location,
1020    },
1021
1022    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1023    SetSkippingIndexOptions {
1024        column_name: String,
1025        source: datatypes::Error,
1026        #[snafu(implicit)]
1027        location: Location,
1028    },
1029
1030    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1031    UnsetSkippingIndexOptions {
1032        column_name: String,
1033        source: datatypes::Error,
1034        #[snafu(implicit)]
1035        location: Location,
1036    },
1037
1038    #[snafu(display("Failed to decode arrow ipc record batches"))]
1039    DecodeArrowIpc {
1040        #[snafu(source)]
1041        error: arrow::error::ArrowError,
1042        #[snafu(implicit)]
1043        location: Location,
1044    },
1045
1046    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1047    CastDefaultValue {
1048        reason: String,
1049        source: datatypes::Error,
1050        #[snafu(implicit)]
1051        location: Location,
1052    },
1053
1054    #[snafu(display("Unexpected: {}", reason))]
1055    Unexpected {
1056        reason: String,
1057        #[snafu(implicit)]
1058        location: Location,
1059    },
1060
1061    #[snafu(display("Failed to encode/decode flight message"))]
1062    FlightCodec {
1063        source: common_grpc::Error,
1064        #[snafu(implicit)]
1065        location: Location,
1066    },
1067
1068    #[snafu(display("Invalid index option"))]
1069    InvalidIndexOption {
1070        #[snafu(implicit)]
1071        location: Location,
1072        #[snafu(source)]
1073        error: datatypes::error::Error,
1074    },
1075}
1076
1077impl ErrorExt for MetadataError {
1078    fn status_code(&self) -> StatusCode {
1079        StatusCode::InvalidArguments
1080    }
1081
1082    fn as_any(&self) -> &dyn Any {
1083        self
1084    }
1085}
1086
1087/// Set column fulltext options if it passed the validation.
1088///
1089/// Options allowed to modify:
1090/// * backend
1091///
1092/// Options not allowed to modify:
1093/// * analyzer
1094/// * case_sensitive
1095fn set_column_fulltext_options(
1096    column_meta: &mut ColumnMetadata,
1097    column_name: String,
1098    options: FulltextOptions,
1099    current_options: Option<FulltextOptions>,
1100) -> Result<()> {
1101    if let Some(current_options) = current_options {
1102        ensure!(
1103            current_options.analyzer == options.analyzer
1104                && current_options.case_sensitive == options.case_sensitive,
1105            InvalidColumnOptionSnafu {
1106                column_name,
1107                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1108                current_options.analyzer, current_options.case_sensitive),
1109            }
1110        );
1111    }
1112
1113    column_meta
1114        .column_schema
1115        .set_fulltext_options(&options)
1116        .context(SetFulltextOptionsSnafu { column_name })?;
1117
1118    Ok(())
1119}
1120
1121fn unset_column_fulltext_options(
1122    column_meta: &mut ColumnMetadata,
1123    column_name: String,
1124    current_options: Option<FulltextOptions>,
1125) -> Result<()> {
1126    if let Some(mut current_options) = current_options
1127        && current_options.enable
1128    {
1129        current_options.enable = false;
1130        column_meta
1131            .column_schema
1132            .set_fulltext_options(&current_options)
1133            .context(SetFulltextOptionsSnafu { column_name })?;
1134    } else {
1135        return InvalidColumnOptionSnafu {
1136            column_name,
1137            msg: "FULLTEXT index already disabled",
1138        }
1139        .fail();
1140    }
1141
1142    Ok(())
1143}
1144
1145#[cfg(test)]
1146mod test {
1147    use datatypes::prelude::ConcreteDataType;
1148    use datatypes::schema::{
1149        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1150    };
1151    use datatypes::value::Value;
1152
1153    use super::*;
1154
1155    fn create_builder() -> RegionMetadataBuilder {
1156        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1157    }
1158
1159    fn build_test_region_metadata() -> RegionMetadata {
1160        let mut builder = create_builder();
1161        builder
1162            .push_column_metadata(ColumnMetadata {
1163                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1164                semantic_type: SemanticType::Tag,
1165                column_id: 1,
1166            })
1167            .push_column_metadata(ColumnMetadata {
1168                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1169                semantic_type: SemanticType::Field,
1170                column_id: 2,
1171            })
1172            .push_column_metadata(ColumnMetadata {
1173                column_schema: ColumnSchema::new(
1174                    "c",
1175                    ConcreteDataType::timestamp_millisecond_datatype(),
1176                    false,
1177                ),
1178                semantic_type: SemanticType::Timestamp,
1179                column_id: 3,
1180            })
1181            .primary_key(vec![1]);
1182        builder.build().unwrap()
1183    }
1184
1185    #[test]
1186    fn test_region_metadata() {
1187        let region_metadata = build_test_region_metadata();
1188        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1189        assert_eq!(
1190            "a",
1191            region_metadata.column_by_id(1).unwrap().column_schema.name
1192        );
1193        assert_eq!(None, region_metadata.column_by_id(10));
1194    }
1195
1196    #[test]
1197    fn test_region_metadata_serde() {
1198        let region_metadata = build_test_region_metadata();
1199        let serialized = serde_json::to_string(&region_metadata).unwrap();
1200        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1201        assert_eq!(region_metadata, deserialized);
1202    }
1203
1204    #[test]
1205    fn test_column_metadata_validate() {
1206        let mut builder = create_builder();
1207        let col = ColumnMetadata {
1208            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1209            semantic_type: SemanticType::Timestamp,
1210            column_id: 1,
1211        };
1212
1213        builder.push_column_metadata(col);
1214        let err = builder.build().unwrap_err();
1215        assert!(
1216            err.to_string()
1217                .contains("column `ts` is not timestamp type"),
1218            "unexpected err: {err}",
1219        );
1220    }
1221
1222    #[test]
1223    fn test_empty_region_metadata() {
1224        let builder = create_builder();
1225        let err = builder.build().unwrap_err();
1226        // A region must have a time index.
1227        assert!(
1228            err.to_string().contains("time index not found"),
1229            "unexpected err: {err}",
1230        );
1231    }
1232
1233    #[test]
1234    fn test_same_column_id() {
1235        let mut builder = create_builder();
1236        builder
1237            .push_column_metadata(ColumnMetadata {
1238                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1239                semantic_type: SemanticType::Tag,
1240                column_id: 1,
1241            })
1242            .push_column_metadata(ColumnMetadata {
1243                column_schema: ColumnSchema::new(
1244                    "b",
1245                    ConcreteDataType::timestamp_millisecond_datatype(),
1246                    false,
1247                ),
1248                semantic_type: SemanticType::Timestamp,
1249                column_id: 1,
1250            });
1251        let err = builder.build().unwrap_err();
1252        assert!(
1253            err.to_string()
1254                .contains("column a and b have the same column id"),
1255            "unexpected err: {err}",
1256        );
1257    }
1258
1259    #[test]
1260    fn test_duplicate_time_index() {
1261        let mut builder = create_builder();
1262        builder
1263            .push_column_metadata(ColumnMetadata {
1264                column_schema: ColumnSchema::new(
1265                    "a",
1266                    ConcreteDataType::timestamp_millisecond_datatype(),
1267                    false,
1268                ),
1269                semantic_type: SemanticType::Timestamp,
1270                column_id: 1,
1271            })
1272            .push_column_metadata(ColumnMetadata {
1273                column_schema: ColumnSchema::new(
1274                    "b",
1275                    ConcreteDataType::timestamp_millisecond_datatype(),
1276                    false,
1277                ),
1278                semantic_type: SemanticType::Timestamp,
1279                column_id: 2,
1280            });
1281        let err = builder.build().unwrap_err();
1282        assert!(
1283            err.to_string().contains("expect only one time index"),
1284            "unexpected err: {err}",
1285        );
1286    }
1287
1288    #[test]
1289    fn test_unknown_primary_key() {
1290        let mut builder = create_builder();
1291        builder
1292            .push_column_metadata(ColumnMetadata {
1293                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1294                semantic_type: SemanticType::Tag,
1295                column_id: 1,
1296            })
1297            .push_column_metadata(ColumnMetadata {
1298                column_schema: ColumnSchema::new(
1299                    "b",
1300                    ConcreteDataType::timestamp_millisecond_datatype(),
1301                    false,
1302                ),
1303                semantic_type: SemanticType::Timestamp,
1304                column_id: 2,
1305            })
1306            .primary_key(vec![3]);
1307        let err = builder.build().unwrap_err();
1308        assert!(
1309            err.to_string().contains("unknown column id 3"),
1310            "unexpected err: {err}",
1311        );
1312    }
1313
1314    #[test]
1315    fn test_same_primary_key() {
1316        let mut builder = create_builder();
1317        builder
1318            .push_column_metadata(ColumnMetadata {
1319                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1320                semantic_type: SemanticType::Tag,
1321                column_id: 1,
1322            })
1323            .push_column_metadata(ColumnMetadata {
1324                column_schema: ColumnSchema::new(
1325                    "b",
1326                    ConcreteDataType::timestamp_millisecond_datatype(),
1327                    false,
1328                ),
1329                semantic_type: SemanticType::Timestamp,
1330                column_id: 2,
1331            })
1332            .primary_key(vec![1, 1]);
1333        let err = builder.build().unwrap_err();
1334        assert!(
1335            err.to_string()
1336                .contains("duplicate column a in primary key"),
1337            "unexpected err: {err}",
1338        );
1339    }
1340
1341    #[test]
1342    fn test_in_time_index() {
1343        let mut builder = create_builder();
1344        builder
1345            .push_column_metadata(ColumnMetadata {
1346                column_schema: ColumnSchema::new(
1347                    "ts",
1348                    ConcreteDataType::timestamp_millisecond_datatype(),
1349                    false,
1350                ),
1351                semantic_type: SemanticType::Timestamp,
1352                column_id: 1,
1353            })
1354            .primary_key(vec![1]);
1355        let err = builder.build().unwrap_err();
1356        assert!(
1357            err.to_string()
1358                .contains("column ts is already a time index column"),
1359            "unexpected err: {err}",
1360        );
1361    }
1362
1363    #[test]
1364    fn test_nullable_time_index() {
1365        let mut builder = create_builder();
1366        builder.push_column_metadata(ColumnMetadata {
1367            column_schema: ColumnSchema::new(
1368                "ts",
1369                ConcreteDataType::timestamp_millisecond_datatype(),
1370                true,
1371            ),
1372            semantic_type: SemanticType::Timestamp,
1373            column_id: 1,
1374        });
1375        let err = builder.build().unwrap_err();
1376        assert!(
1377            err.to_string()
1378                .contains("time index column ts must be NOT NULL"),
1379            "unexpected err: {err}",
1380        );
1381    }
1382
1383    #[test]
1384    fn test_primary_key_semantic_type() {
1385        let mut builder = create_builder();
1386        builder
1387            .push_column_metadata(ColumnMetadata {
1388                column_schema: ColumnSchema::new(
1389                    "ts",
1390                    ConcreteDataType::timestamp_millisecond_datatype(),
1391                    false,
1392                ),
1393                semantic_type: SemanticType::Timestamp,
1394                column_id: 1,
1395            })
1396            .push_column_metadata(ColumnMetadata {
1397                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1398                semantic_type: SemanticType::Field,
1399                column_id: 2,
1400            })
1401            .primary_key(vec![2]);
1402        let err = builder.build().unwrap_err();
1403        assert!(
1404            err.to_string()
1405                .contains("semantic type of column a should be Tag, not Field"),
1406            "unexpected err: {err}",
1407        );
1408    }
1409
1410    #[test]
1411    fn test_primary_key_tag_num() {
1412        let mut builder = create_builder();
1413        builder
1414            .push_column_metadata(ColumnMetadata {
1415                column_schema: ColumnSchema::new(
1416                    "ts",
1417                    ConcreteDataType::timestamp_millisecond_datatype(),
1418                    false,
1419                ),
1420                semantic_type: SemanticType::Timestamp,
1421                column_id: 1,
1422            })
1423            .push_column_metadata(ColumnMetadata {
1424                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1425                semantic_type: SemanticType::Tag,
1426                column_id: 2,
1427            })
1428            .push_column_metadata(ColumnMetadata {
1429                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1430                semantic_type: SemanticType::Tag,
1431                column_id: 3,
1432            })
1433            .primary_key(vec![2]);
1434        let err = builder.build().unwrap_err();
1435        assert!(
1436            err.to_string()
1437                .contains("number of primary key columns 1 not equal to tag columns 2"),
1438            "unexpected err: {err}",
1439        );
1440    }
1441
1442    #[test]
1443    fn test_bump_version() {
1444        let mut region_metadata = build_test_region_metadata();
1445        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1446        builder.bump_version();
1447        let new_meta = builder.build().unwrap();
1448        region_metadata.schema_version += 1;
1449        assert_eq!(region_metadata, new_meta);
1450    }
1451
1452    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1453        let semantic_type = if is_tag {
1454            SemanticType::Tag
1455        } else {
1456            SemanticType::Field
1457        };
1458        ColumnMetadata {
1459            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1460            semantic_type,
1461            column_id,
1462        }
1463    }
1464
1465    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1466        let actual: Vec<_> = metadata
1467            .column_metadatas
1468            .iter()
1469            .map(|col| &col.column_schema.name)
1470            .collect();
1471        assert_eq!(names, actual);
1472    }
1473
1474    fn get_columns_default_constraint(
1475        metadata: &RegionMetadata,
1476        name: String,
1477    ) -> Option<Option<&ColumnDefaultConstraint>> {
1478        metadata.column_metadatas.iter().find_map(|col| {
1479            if col.column_schema.name == name {
1480                Some(col.column_schema.default_constraint())
1481            } else {
1482                None
1483            }
1484        })
1485    }
1486
1487    #[test]
1488    fn test_alter() {
1489        // a (tag), b (field), c (ts)
1490        let metadata = build_test_region_metadata();
1491        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1492        // tag d
1493        builder
1494            .alter(AlterKind::AddColumns {
1495                columns: vec![AddColumn {
1496                    column_metadata: new_column_metadata("d", true, 4),
1497                    location: None,
1498                }],
1499            })
1500            .unwrap();
1501        let metadata = builder.build().unwrap();
1502        check_columns(&metadata, &["a", "b", "c", "d"]);
1503        assert_eq!([1, 4], &metadata.primary_key[..]);
1504
1505        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1506        builder
1507            .alter(AlterKind::AddColumns {
1508                columns: vec![AddColumn {
1509                    column_metadata: new_column_metadata("e", false, 5),
1510                    location: Some(AddColumnLocation::First),
1511                }],
1512            })
1513            .unwrap();
1514        let metadata = builder.build().unwrap();
1515        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1516
1517        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1518        builder
1519            .alter(AlterKind::AddColumns {
1520                columns: vec![AddColumn {
1521                    column_metadata: new_column_metadata("f", false, 6),
1522                    location: Some(AddColumnLocation::After {
1523                        column_name: "b".to_string(),
1524                    }),
1525                }],
1526            })
1527            .unwrap();
1528        let metadata = builder.build().unwrap();
1529        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1530
1531        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1532        builder
1533            .alter(AlterKind::AddColumns {
1534                columns: vec![AddColumn {
1535                    column_metadata: new_column_metadata("g", false, 7),
1536                    location: Some(AddColumnLocation::After {
1537                        column_name: "d".to_string(),
1538                    }),
1539                }],
1540            })
1541            .unwrap();
1542        let metadata = builder.build().unwrap();
1543        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1544
1545        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1546        builder
1547            .alter(AlterKind::DropColumns {
1548                names: vec!["g".to_string(), "e".to_string()],
1549            })
1550            .unwrap();
1551        let metadata = builder.build().unwrap();
1552        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1553
1554        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1555        builder
1556            .alter(AlterKind::DropColumns {
1557                names: vec!["a".to_string()],
1558            })
1559            .unwrap();
1560        // Build returns error as the primary key contains a.
1561        let err = builder.build().unwrap_err();
1562        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1563
1564        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1565        let mut column_metadata = new_column_metadata("g", false, 8);
1566        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1567        column_metadata.column_schema = column_metadata
1568            .column_schema
1569            .with_default_constraint(default_constraint.clone())
1570            .unwrap();
1571        builder
1572            .alter(AlterKind::AddColumns {
1573                columns: vec![AddColumn {
1574                    column_metadata,
1575                    location: None,
1576                }],
1577            })
1578            .unwrap();
1579        let metadata = builder.build().unwrap();
1580        assert_eq!(
1581            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1582            default_constraint.as_ref()
1583        );
1584        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1585
1586        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1587        builder
1588            .alter(AlterKind::DropDefaults {
1589                names: vec!["g".to_string()],
1590            })
1591            .unwrap();
1592        let metadata = builder.build().unwrap();
1593        assert_eq!(
1594            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1595            None
1596        );
1597        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1598
1599        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1600        builder
1601            .alter(AlterKind::DropColumns {
1602                names: vec!["g".to_string()],
1603            })
1604            .unwrap();
1605        let metadata = builder.build().unwrap();
1606        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1607
1608        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1609        builder
1610            .alter(AlterKind::ModifyColumnTypes {
1611                columns: vec![ModifyColumnType {
1612                    column_name: "b".to_string(),
1613                    target_type: ConcreteDataType::string_datatype(),
1614                }],
1615            })
1616            .unwrap();
1617        let metadata = builder.build().unwrap();
1618        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1619        let b_type = &metadata
1620            .column_by_name("b")
1621            .unwrap()
1622            .column_schema
1623            .data_type;
1624        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1625
1626        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1627        builder
1628            .alter(AlterKind::SetIndex {
1629                options: ApiSetIndexOptions::Fulltext {
1630                    column_name: "b".to_string(),
1631                    options: FulltextOptions::new_unchecked(
1632                        true,
1633                        FulltextAnalyzer::Chinese,
1634                        true,
1635                        FulltextBackend::Bloom,
1636                        1000,
1637                        0.01,
1638                    ),
1639                },
1640            })
1641            .unwrap();
1642        let metadata = builder.build().unwrap();
1643        let a_fulltext_options = metadata
1644            .column_by_name("b")
1645            .unwrap()
1646            .column_schema
1647            .fulltext_options()
1648            .unwrap()
1649            .unwrap();
1650        assert!(a_fulltext_options.enable);
1651        assert_eq!(
1652            datatypes::schema::FulltextAnalyzer::Chinese,
1653            a_fulltext_options.analyzer
1654        );
1655        assert!(a_fulltext_options.case_sensitive);
1656
1657        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1658        builder
1659            .alter(AlterKind::UnsetIndex {
1660                options: ApiUnsetIndexOptions::Fulltext {
1661                    column_name: "b".to_string(),
1662                },
1663            })
1664            .unwrap();
1665        let metadata = builder.build().unwrap();
1666        let a_fulltext_options = metadata
1667            .column_by_name("b")
1668            .unwrap()
1669            .column_schema
1670            .fulltext_options()
1671            .unwrap()
1672            .unwrap();
1673        assert!(!a_fulltext_options.enable);
1674        assert_eq!(
1675            datatypes::schema::FulltextAnalyzer::Chinese,
1676            a_fulltext_options.analyzer
1677        );
1678        assert!(a_fulltext_options.case_sensitive);
1679    }
1680
1681    #[test]
1682    fn test_add_if_not_exists() {
1683        // a (tag), b (field), c (ts)
1684        let metadata = build_test_region_metadata();
1685        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1686        // tag d
1687        builder
1688            .alter(AlterKind::AddColumns {
1689                columns: vec![
1690                    AddColumn {
1691                        column_metadata: new_column_metadata("d", true, 4),
1692                        location: None,
1693                    },
1694                    AddColumn {
1695                        column_metadata: new_column_metadata("d", true, 4),
1696                        location: None,
1697                    },
1698                ],
1699            })
1700            .unwrap();
1701        let metadata = builder.build().unwrap();
1702        check_columns(&metadata, &["a", "b", "c", "d"]);
1703        assert_eq!([1, 4], &metadata.primary_key[..]);
1704
1705        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1706        // field b.
1707        builder
1708            .alter(AlterKind::AddColumns {
1709                columns: vec![AddColumn {
1710                    column_metadata: new_column_metadata("b", false, 2),
1711                    location: None,
1712                }],
1713            })
1714            .unwrap();
1715        let metadata = builder.build().unwrap();
1716        check_columns(&metadata, &["a", "b", "c", "d"]);
1717    }
1718
1719    #[test]
1720    fn test_add_column_with_inverted_index() {
1721        // only set inverted index to true explicitly will this column be inverted indexed
1722
1723        // a (tag), b (field), c (ts)
1724        let metadata = build_test_region_metadata();
1725        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1726        // tag d, e
1727        let mut col = new_column_metadata("d", true, 4);
1728        col.column_schema.set_inverted_index(true);
1729        builder
1730            .alter(AlterKind::AddColumns {
1731                columns: vec![
1732                    AddColumn {
1733                        column_metadata: col,
1734                        location: None,
1735                    },
1736                    AddColumn {
1737                        column_metadata: new_column_metadata("e", true, 5),
1738                        location: None,
1739                    },
1740                ],
1741            })
1742            .unwrap();
1743        let metadata = builder.build().unwrap();
1744        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1745        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1746        let column_metadata = metadata.column_by_name("a").unwrap();
1747        assert!(!column_metadata.column_schema.is_inverted_indexed());
1748        let column_metadata = metadata.column_by_name("b").unwrap();
1749        assert!(!column_metadata.column_schema.is_inverted_indexed());
1750        let column_metadata = metadata.column_by_name("c").unwrap();
1751        assert!(!column_metadata.column_schema.is_inverted_indexed());
1752        let column_metadata = metadata.column_by_name("d").unwrap();
1753        assert!(column_metadata.column_schema.is_inverted_indexed());
1754        let column_metadata = metadata.column_by_name("e").unwrap();
1755        assert!(!column_metadata.column_schema.is_inverted_indexed());
1756    }
1757
1758    #[test]
1759    fn test_drop_if_exists() {
1760        // a (tag), b (field), c (ts)
1761        let metadata = build_test_region_metadata();
1762        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1763        // field d, e
1764        builder
1765            .alter(AlterKind::AddColumns {
1766                columns: vec![
1767                    AddColumn {
1768                        column_metadata: new_column_metadata("d", false, 4),
1769                        location: None,
1770                    },
1771                    AddColumn {
1772                        column_metadata: new_column_metadata("e", false, 5),
1773                        location: None,
1774                    },
1775                ],
1776            })
1777            .unwrap();
1778        let metadata = builder.build().unwrap();
1779        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1780
1781        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1782        builder
1783            .alter(AlterKind::DropColumns {
1784                names: vec!["b".to_string(), "b".to_string()],
1785            })
1786            .unwrap();
1787        let metadata = builder.build().unwrap();
1788        check_columns(&metadata, &["a", "c", "d", "e"]);
1789
1790        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1791        builder
1792            .alter(AlterKind::DropColumns {
1793                names: vec!["b".to_string(), "e".to_string()],
1794            })
1795            .unwrap();
1796        let metadata = builder.build().unwrap();
1797        check_columns(&metadata, &["a", "c", "d"]);
1798    }
1799
1800    #[test]
1801    fn test_invalid_column_name() {
1802        let mut builder = create_builder();
1803        builder.push_column_metadata(ColumnMetadata {
1804            column_schema: ColumnSchema::new(
1805                "__sequence",
1806                ConcreteDataType::timestamp_millisecond_datatype(),
1807                false,
1808            ),
1809            semantic_type: SemanticType::Timestamp,
1810            column_id: 1,
1811        });
1812        let err = builder.build().unwrap_err();
1813        assert!(
1814            err.to_string()
1815                .contains("internal column name that can not be used"),
1816            "unexpected err: {err}",
1817        );
1818    }
1819
1820    #[test]
1821    fn test_debug_for_column_metadata() {
1822        let region_metadata = build_test_region_metadata();
1823        let formatted = format!("{:?}", region_metadata);
1824        assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1825    }
1826
1827    #[test]
1828    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1829        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1830        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1831        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1832
1833        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1834        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1835        assert_eq!(
1836            deserialized.primary_key_encoding,
1837            PrimaryKeyEncoding::Sparse
1838        );
1839    }
1840}