store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use serde::de::Error;
35use serde::{Deserialize, Deserializer, Serialize};
36use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
37
38use crate::codec::PrimaryKeyEncoding;
39use crate::region_request::{
40    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
41};
42use crate::storage::consts::is_internal_column;
43use crate::storage::{ColumnId, RegionId};
44
45pub type Result<T> = std::result::Result<T, MetadataError>;
46
47/// Metadata of a column.
48#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
49pub struct ColumnMetadata {
50    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
51    pub column_schema: ColumnSchema,
52    /// Semantic type of this column (e.g. tag or timestamp).
53    pub semantic_type: SemanticType,
54    /// Immutable and unique id of a region.
55    pub column_id: ColumnId,
56}
57
58impl fmt::Debug for ColumnMetadata {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        write!(
61            f,
62            "[{:?} {:?} {:?}]",
63            self.column_schema, self.semantic_type, self.column_id,
64        )
65    }
66}
67
68impl ColumnMetadata {
69    /// Construct `Self` from protobuf struct [RegionColumnDef]
70    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
71        let column_id = column_def.column_id;
72        let column_def = column_def
73            .column_def
74            .context(InvalidRawRegionRequestSnafu {
75                err: "column_def is absent",
76            })?;
77        let semantic_type = column_def.semantic_type();
78        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
79
80        Ok(Self {
81            column_schema,
82            semantic_type,
83            column_id,
84        })
85    }
86
87    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
88    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
89        serde_json::to_vec(columns)
90    }
91
92    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
93    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
94        serde_json::from_slice(bytes)
95    }
96
97    pub fn is_same_datatype(&self, other: &Self) -> bool {
98        self.column_schema.data_type == other.column_schema.data_type
99    }
100}
101
102#[cfg_attr(doc, aquamarine::aquamarine)]
103/// General static metadata of a region.
104///
105/// This struct implements [Serialize] and [Deserialize] traits.
106/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
107///
108/// ```mermaid
109/// class RegionMetadata {
110///     +RegionId region_id
111///     +SchemaRef schema
112///     +Vec&lt;ColumnMetadata&gt; column_metadatas
113///     +Vec&lt;ColumnId&gt; primary_key
114/// }
115/// class Schema
116/// class ColumnMetadata {
117///     +ColumnSchema column_schema
118///     +SemanticTyle semantic_type
119///     +ColumnId column_id
120/// }
121/// class SemanticType
122/// RegionMetadata o-- Schema
123/// RegionMetadata o-- ColumnMetadata
124/// ColumnMetadata o-- SemanticType
125/// ```
126#[derive(Clone, PartialEq, Eq, Serialize)]
127pub struct RegionMetadata {
128    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
129    #[serde(skip)]
130    pub schema: SchemaRef,
131
132    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
133    // so we can assumes they are valid.
134    /// Id of the time index column.
135    #[serde(skip)]
136    time_index: ColumnId,
137    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
138    #[serde(skip)]
139    id_to_index: HashMap<ColumnId, usize>,
140
141    /// Columns in the region. Has the same order as columns
142    /// in [schema](RegionMetadata::schema).
143    pub column_metadatas: Vec<ColumnMetadata>,
144    /// Maintains an ordered list of primary keys
145    pub primary_key: Vec<ColumnId>,
146
147    /// Immutable and unique id of a region.
148    pub region_id: RegionId,
149    /// Current version of the region schema.
150    ///
151    /// The version starts from 0. Altering the schema bumps the version.
152    pub schema_version: u64,
153
154    /// Primary key encoding mode.
155    pub primary_key_encoding: PrimaryKeyEncoding,
156}
157
158impl fmt::Debug for RegionMetadata {
159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160        f.debug_struct("RegionMetadata")
161            .field("column_metadatas", &self.column_metadatas)
162            .field("time_index", &self.time_index)
163            .field("primary_key", &self.primary_key)
164            .field("region_id", &self.region_id)
165            .field("schema_version", &self.schema_version)
166            .finish()
167    }
168}
169
170pub type RegionMetadataRef = Arc<RegionMetadata>;
171
172impl<'de> Deserialize<'de> for RegionMetadata {
173    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
174    where
175        D: Deserializer<'de>,
176    {
177        // helper internal struct for deserialization
178        #[derive(Deserialize)]
179        struct RegionMetadataWithoutSchema {
180            column_metadatas: Vec<ColumnMetadata>,
181            primary_key: Vec<ColumnId>,
182            region_id: RegionId,
183            schema_version: u64,
184            #[serde(default)]
185            primary_key_encoding: PrimaryKeyEncoding,
186        }
187
188        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
189        let skipped =
190            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
191
192        Ok(Self {
193            schema: skipped.schema,
194            time_index: skipped.time_index,
195            id_to_index: skipped.id_to_index,
196            column_metadatas: without_schema.column_metadatas,
197            primary_key: without_schema.primary_key,
198            region_id: without_schema.region_id,
199            schema_version: without_schema.schema_version,
200            primary_key_encoding: without_schema.primary_key_encoding,
201        })
202    }
203}
204
205impl RegionMetadata {
206    /// Decode the metadata from a JSON str.
207    pub fn from_json(s: &str) -> Result<Self> {
208        serde_json::from_str(s).context(SerdeJsonSnafu)
209    }
210
211    /// Encode the metadata to a JSON string.
212    pub fn to_json(&self) -> Result<String> {
213        serde_json::to_string(&self).context(SerdeJsonSnafu)
214    }
215
216    /// Find column by id.
217    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
218        self.id_to_index
219            .get(&column_id)
220            .map(|index| &self.column_metadatas[*index])
221    }
222
223    /// Find column index by id.
224    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
225        self.id_to_index.get(&column_id).copied()
226    }
227
228    /// Find column index by name.
229    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
230        self.column_metadatas
231            .iter()
232            .position(|col| col.column_schema.name == column_name)
233    }
234
235    /// Returns the time index column
236    ///
237    /// # Panics
238    /// Panics if the time index column id is invalid.
239    pub fn time_index_column(&self) -> &ColumnMetadata {
240        let index = self.id_to_index[&self.time_index];
241        &self.column_metadatas[index]
242    }
243
244    /// Returns timestamp type of time index column
245    ///
246    /// # Panics
247    /// Panics if the time index column id is invalid.
248    pub fn time_index_type(&self) -> TimestampType {
249        let index = self.id_to_index[&self.time_index];
250        self.column_metadatas[index]
251            .column_schema
252            .data_type
253            .as_timestamp()
254            .unwrap()
255    }
256
257    /// Returns the position of the time index.
258    pub fn time_index_column_pos(&self) -> usize {
259        self.id_to_index[&self.time_index]
260    }
261
262    /// Returns the arrow field of the time index column.
263    pub fn time_index_field(&self) -> FieldRef {
264        let index = self.id_to_index[&self.time_index];
265        self.schema.arrow_schema().fields[index].clone()
266    }
267
268    /// Finds a column by name.
269    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
270        self.schema
271            .column_index_by_name(name)
272            .map(|index| &self.column_metadatas[index])
273    }
274
275    /// Returns all primary key columns.
276    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
277        // safety: RegionMetadata::validate ensures every primary key exists.
278        self.primary_key
279            .iter()
280            .map(|id| self.column_by_id(*id).unwrap())
281    }
282
283    /// Returns all field columns before projection.
284    ///
285    /// **Use with caution**. On read path where might have projection, this method
286    /// can return columns that not present in data batch.
287    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288        self.column_metadatas
289            .iter()
290            .filter(|column| column.semantic_type == SemanticType::Field)
291    }
292
293    /// Returns a column's index in primary key if it is a primary key column.
294    ///
295    /// This does a linear search.
296    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
297        self.primary_key.iter().position(|id| *id == column_id)
298    }
299
300    /// Project the metadata to a new one using specified column ids.
301    ///
302    /// [RegionId] and schema version are preserved.
303    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
304        // check time index
305        ensure!(
306            projection.contains(&self.time_index),
307            TimeIndexNotFoundSnafu
308        );
309
310        // prepare new indices
311        let indices_to_preserve = projection
312            .iter()
313            .map(|id| {
314                self.column_index_by_id(*id)
315                    .with_context(|| InvalidRegionRequestSnafu {
316                        region_id: self.region_id,
317                        err: format!("column id {} not found", id),
318                    })
319            })
320            .collect::<Result<Vec<_>>>()?;
321
322        // project schema
323        let projected_schema =
324            self.schema
325                .try_project(&indices_to_preserve)
326                .with_context(|_| SchemaProjectSnafu {
327                    origin_schema: self.schema.clone(),
328                    projection: projection.to_vec(),
329                })?;
330
331        // project columns, generate projected primary key and new id_to_index
332        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
333        let mut projected_primary_key = vec![];
334        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
335        for index in indices_to_preserve {
336            let col = self.column_metadatas[index].clone();
337            if col.semantic_type == SemanticType::Tag {
338                projected_primary_key.push(col.column_id);
339            }
340            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
341            projected_column_metadatas.push(col);
342        }
343
344        Ok(RegionMetadata {
345            schema: Arc::new(projected_schema),
346            time_index: self.time_index,
347            id_to_index: projected_id_to_index,
348            column_metadatas: projected_column_metadatas,
349            primary_key: projected_primary_key,
350            region_id: self.region_id,
351            schema_version: self.schema_version,
352            primary_key_encoding: self.primary_key_encoding,
353        })
354    }
355
356    /// Gets the column ids to be indexed by inverted index.
357    pub fn inverted_indexed_column_ids<'a>(
358        &self,
359        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
360    ) -> HashSet<ColumnId> {
361        let mut inverted_index = self
362            .column_metadatas
363            .iter()
364            .filter(|column| column.column_schema.is_inverted_indexed())
365            .map(|column| column.column_id)
366            .collect::<HashSet<_>>();
367
368        for ignored in ignore_column_ids {
369            inverted_index.remove(ignored);
370        }
371
372        inverted_index
373    }
374
375    /// Checks whether the metadata is valid.
376    fn validate(&self) -> Result<()> {
377        // Id to name.
378        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
379        for col in &self.column_metadatas {
380            // Validate each column.
381            Self::validate_column_metadata(col)?;
382
383            // Check whether column id is duplicated. We already check column name
384            // is unique in `Schema` so we only check column id here.
385            ensure!(
386                !id_names.contains_key(&col.column_id),
387                InvalidMetaSnafu {
388                    reason: format!(
389                        "column {} and {} have the same column id {}",
390                        id_names[&col.column_id], col.column_schema.name, col.column_id,
391                    ),
392                }
393            );
394            id_names.insert(col.column_id, &col.column_schema.name);
395        }
396
397        // Checks there is only one time index.
398        let num_time_index = self
399            .column_metadatas
400            .iter()
401            .filter(|col| col.semantic_type == SemanticType::Timestamp)
402            .count();
403        ensure!(
404            num_time_index == 1,
405            InvalidMetaSnafu {
406                reason: format!("expect only one time index, found {}", num_time_index),
407            }
408        );
409
410        // Checks the time index column is not nullable.
411        ensure!(
412            !self.time_index_column().column_schema.is_nullable(),
413            InvalidMetaSnafu {
414                reason: format!(
415                    "time index column {} must be NOT NULL",
416                    self.time_index_column().column_schema.name
417                ),
418            }
419        );
420
421        if !self.primary_key.is_empty() {
422            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
423            // Checks column ids in the primary key is valid.
424            for column_id in &self.primary_key {
425                // Checks whether the column id exists.
426                ensure!(
427                    id_names.contains_key(column_id),
428                    InvalidMetaSnafu {
429                        reason: format!("unknown column id {}", column_id),
430                    }
431                );
432
433                // Safety: Column with specific id must exist.
434                let column = self.column_by_id(*column_id).unwrap();
435                // Checks duplicate.
436                ensure!(
437                    !pk_ids.contains(&column_id),
438                    InvalidMetaSnafu {
439                        reason: format!(
440                            "duplicate column {} in primary key",
441                            column.column_schema.name
442                        ),
443                    }
444                );
445
446                // Checks this is not a time index column.
447                ensure!(
448                    *column_id != self.time_index,
449                    InvalidMetaSnafu {
450                        reason: format!(
451                            "column {} is already a time index column",
452                            column.column_schema.name,
453                        ),
454                    }
455                );
456
457                // Checks semantic type.
458                ensure!(
459                    column.semantic_type == SemanticType::Tag,
460                    InvalidMetaSnafu {
461                        reason: format!(
462                            "semantic type of column {} should be Tag, not {:?}",
463                            column.column_schema.name, column.semantic_type
464                        ),
465                    }
466                );
467
468                pk_ids.insert(column_id);
469            }
470        }
471
472        // Checks tag semantic type.
473        let num_tag = self
474            .column_metadatas
475            .iter()
476            .filter(|col| col.semantic_type == SemanticType::Tag)
477            .count();
478        ensure!(
479            num_tag == self.primary_key.len(),
480            InvalidMetaSnafu {
481                reason: format!(
482                    "number of primary key columns {} not equal to tag columns {}",
483                    self.primary_key.len(),
484                    num_tag
485                ),
486            }
487        );
488
489        Ok(())
490    }
491
492    /// Checks whether it is a valid column.
493    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
494        if column_metadata.semantic_type == SemanticType::Timestamp {
495            ensure!(
496                column_metadata.column_schema.data_type.is_timestamp(),
497                InvalidMetaSnafu {
498                    reason: format!(
499                        "column `{}` is not timestamp type",
500                        column_metadata.column_schema.name
501                    ),
502                }
503            );
504        }
505
506        ensure!(
507            !is_internal_column(&column_metadata.column_schema.name),
508            InvalidMetaSnafu {
509                reason: format!(
510                    "{} is internal column name that can not be used",
511                    column_metadata.column_schema.name
512                ),
513            }
514        );
515
516        Ok(())
517    }
518}
519
520/// Builder to build [RegionMetadata].
521pub struct RegionMetadataBuilder {
522    region_id: RegionId,
523    column_metadatas: Vec<ColumnMetadata>,
524    primary_key: Vec<ColumnId>,
525    schema_version: u64,
526    primary_key_encoding: PrimaryKeyEncoding,
527}
528
529impl RegionMetadataBuilder {
530    /// Returns a new builder.
531    pub fn new(id: RegionId) -> Self {
532        Self {
533            region_id: id,
534            column_metadatas: vec![],
535            primary_key: vec![],
536            schema_version: 0,
537            primary_key_encoding: PrimaryKeyEncoding::Dense,
538        }
539    }
540
541    /// Creates a builder from existing [RegionMetadata].
542    pub fn from_existing(existing: RegionMetadata) -> Self {
543        Self {
544            column_metadatas: existing.column_metadatas,
545            primary_key: existing.primary_key,
546            region_id: existing.region_id,
547            schema_version: existing.schema_version,
548            primary_key_encoding: existing.primary_key_encoding,
549        }
550    }
551
552    /// Sets the primary key encoding mode.
553    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
554        self.primary_key_encoding = encoding;
555        self
556    }
557
558    /// Pushes a new column metadata to this region's metadata.
559    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
560        self.column_metadatas.push(column_metadata);
561        self
562    }
563
564    /// Sets the primary key of the region.
565    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
566        self.primary_key = key;
567        self
568    }
569
570    /// Increases the schema version by 1.
571    pub fn bump_version(&mut self) -> &mut Self {
572        self.schema_version += 1;
573        self
574    }
575
576    /// Applies the alter `kind` to the builder.
577    ///
578    /// The `kind` should be valid.
579    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
580        match kind {
581            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
582            AlterKind::DropColumns { names } => self.drop_columns(&names),
583            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
584            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
585            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
586            AlterKind::SetRegionOptions { options: _ } => {
587                // nothing to be done with RegionMetadata
588            }
589            AlterKind::UnsetRegionOptions { keys: _ } => {
590                // nothing to be done with RegionMetadata
591            }
592            AlterKind::DropDefaults { names } => {
593                self.drop_defaults(names)?;
594            }
595            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
596            AlterKind::SyncColumns { column_metadatas } => {
597                self.primary_key = column_metadatas
598                    .iter()
599                    .filter_map(|column_metadata| {
600                        if column_metadata.semantic_type == SemanticType::Tag {
601                            Some(column_metadata.column_id)
602                        } else {
603                            None
604                        }
605                    })
606                    .collect::<Vec<_>>();
607                self.column_metadatas = column_metadatas;
608            }
609        }
610        Ok(self)
611    }
612
613    /// Consumes the builder and build a [RegionMetadata].
614    pub fn build(self) -> Result<RegionMetadata> {
615        let skipped = SkippedFields::new(&self.column_metadatas)?;
616
617        let meta = RegionMetadata {
618            schema: skipped.schema,
619            time_index: skipped.time_index,
620            id_to_index: skipped.id_to_index,
621            column_metadatas: self.column_metadatas,
622            primary_key: self.primary_key,
623            region_id: self.region_id,
624            schema_version: self.schema_version,
625            primary_key_encoding: self.primary_key_encoding,
626        };
627
628        meta.validate()?;
629
630        Ok(meta)
631    }
632
633    /// Adds columns to the metadata if not exist.
634    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
635        let mut names: HashSet<_> = self
636            .column_metadatas
637            .iter()
638            .map(|col| col.column_schema.name.clone())
639            .collect();
640
641        for add_column in columns {
642            if names.contains(&add_column.column_metadata.column_schema.name) {
643                // Column already exists.
644                continue;
645            }
646
647            let column_id = add_column.column_metadata.column_id;
648            let semantic_type = add_column.column_metadata.semantic_type;
649            let column_name = add_column.column_metadata.column_schema.name.clone();
650            match add_column.location {
651                None => {
652                    self.column_metadatas.push(add_column.column_metadata);
653                }
654                Some(AddColumnLocation::First) => {
655                    self.column_metadatas.insert(0, add_column.column_metadata);
656                }
657                Some(AddColumnLocation::After { column_name }) => {
658                    let pos = self
659                        .column_metadatas
660                        .iter()
661                        .position(|col| col.column_schema.name == column_name)
662                        .context(InvalidRegionRequestSnafu {
663                            region_id: self.region_id,
664                            err: format!(
665                                "column {} not found, failed to add column {} after it",
666                                column_name, add_column.column_metadata.column_schema.name
667                            ),
668                        })?;
669                    // Insert after pos.
670                    self.column_metadatas
671                        .insert(pos + 1, add_column.column_metadata);
672                }
673            }
674            names.insert(column_name);
675            if semantic_type == SemanticType::Tag {
676                // For a new tag, we extend the primary key.
677                self.primary_key.push(column_id);
678            }
679        }
680
681        Ok(())
682    }
683
684    /// Drops columns from the metadata if exist.
685    fn drop_columns(&mut self, names: &[String]) {
686        let name_set: HashSet<_> = names.iter().collect();
687        self.column_metadatas
688            .retain(|col| !name_set.contains(&col.column_schema.name));
689    }
690
691    /// Changes columns type to the metadata if exist.
692    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
693        let mut change_type_map: HashMap<_, _> = columns
694            .into_iter()
695            .map(
696                |ModifyColumnType {
697                     column_name,
698                     target_type,
699                 }| (column_name, target_type),
700            )
701            .collect();
702
703        for column_meta in self.column_metadatas.iter_mut() {
704            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
705                column_meta.column_schema.data_type = target_type.clone();
706                // also cast default value to target_type if default value exist
707                let new_default =
708                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
709                        Some(
710                            default_value
711                                .cast_to_datatype(&target_type)
712                                .with_context(|_| CastDefaultValueSnafu {
713                                    reason: format!(
714                                        "Failed to cast default value from {:?} to type {:?}",
715                                        default_value, target_type
716                                    ),
717                                })?,
718                        )
719                    } else {
720                        None
721                    };
722                column_meta.column_schema = column_meta
723                    .column_schema
724                    .clone()
725                    .with_default_constraint(new_default.clone())
726                    .with_context(|_| CastDefaultValueSnafu {
727                        reason: format!("Failed to set new default: {:?}", new_default),
728                    })?;
729            }
730        }
731
732        Ok(())
733    }
734
735    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
736        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
737        for option in &options {
738            set_index_map
739                .entry(option.column_name())
740                .or_default()
741                .push(option);
742        }
743
744        for column_metadata in self.column_metadatas.iter_mut() {
745            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
746                for option in options {
747                    Self::set_index(column_metadata, option)?;
748                }
749            }
750        }
751
752        Ok(())
753    }
754
755    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
756        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
757        for option in &options {
758            unset_index_map
759                .entry(option.column_name())
760                .or_default()
761                .push(option);
762        }
763
764        for column_metadata in self.column_metadatas.iter_mut() {
765            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
766                for option in options {
767                    Self::unset_index(column_metadata, option)?;
768                }
769            }
770        }
771
772        Ok(())
773    }
774
775    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
776        match options {
777            SetIndexOption::Fulltext {
778                column_name,
779                options,
780            } => {
781                ensure!(
782                    column_metadata.column_schema.data_type.is_string(),
783                    InvalidColumnOptionSnafu {
784                        column_name,
785                        msg: "FULLTEXT index only supports string type".to_string(),
786                    }
787                );
788                let current_fulltext_options = column_metadata
789                    .column_schema
790                    .fulltext_options()
791                    .with_context(|_| GetFulltextOptionsSnafu {
792                        column_name: column_name.to_string(),
793                    })?;
794                set_column_fulltext_options(
795                    column_metadata,
796                    column_name,
797                    options,
798                    current_fulltext_options,
799                )?;
800            }
801            SetIndexOption::Inverted { .. } => {
802                column_metadata.column_schema.set_inverted_index(true)
803            }
804            SetIndexOption::Skipping {
805                column_name,
806                options,
807            } => {
808                column_metadata
809                    .column_schema
810                    .set_skipping_options(options)
811                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
812            }
813        }
814
815        Ok(())
816    }
817
818    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
819        match options {
820            UnsetIndexOption::Fulltext { column_name } => {
821                ensure!(
822                    column_metadata.column_schema.data_type.is_string(),
823                    InvalidColumnOptionSnafu {
824                        column_name,
825                        msg: "FULLTEXT index only supports string type".to_string(),
826                    }
827                );
828
829                let current_fulltext_options = column_metadata
830                    .column_schema
831                    .fulltext_options()
832                    .with_context(|_| GetFulltextOptionsSnafu {
833                        column_name: column_name.to_string(),
834                    })?;
835
836                unset_column_fulltext_options(
837                    column_metadata,
838                    column_name,
839                    current_fulltext_options,
840                )?;
841            }
842            UnsetIndexOption::Inverted { .. } => {
843                column_metadata.column_schema.set_inverted_index(false)
844            }
845            UnsetIndexOption::Skipping { column_name } => {
846                column_metadata
847                    .column_schema
848                    .unset_skipping_options()
849                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
850            }
851        }
852
853        Ok(())
854    }
855
856    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
857        for name in column_names.iter() {
858            let meta = self
859                .column_metadatas
860                .iter_mut()
861                .find(|col| col.column_schema.name == *name);
862            if let Some(meta) = meta {
863                if !meta.column_schema.is_nullable() {
864                    return InvalidRegionRequestSnafu {
865                        region_id: self.region_id,
866                        err: format!(
867                            "column {name} is not nullable and `default` cannot be dropped",
868                        ),
869                    }
870                    .fail();
871                }
872                meta.column_schema = meta
873                    .column_schema
874                    .clone()
875                    .with_default_constraint(None)
876                    .with_context(|_| CastDefaultValueSnafu {
877                        reason: format!("Failed to drop default : {name:?}"),
878                    })?;
879            } else {
880                return InvalidRegionRequestSnafu {
881                    region_id: self.region_id,
882                    err: format!("column {name} not found",),
883                }
884                .fail();
885            }
886        }
887        Ok(())
888    }
889
890    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
891        for set_default in set_defaults.iter() {
892            let meta = self
893                .column_metadatas
894                .iter_mut()
895                .find(|col| col.column_schema.name == set_default.name);
896            if let Some(meta) = meta {
897                let default_constraint = common_sql::convert::deserialize_default_constraint(
898                    set_default.default_constraint.as_slice(),
899                    &meta.column_schema.name,
900                    &meta.column_schema.data_type,
901                )
902                .context(SqlCommonSnafu)?;
903
904                meta.column_schema = meta
905                    .column_schema
906                    .clone()
907                    .with_default_constraint(default_constraint)
908                    .with_context(|_| CastDefaultValueSnafu {
909                        reason: format!("Failed to set default : {set_default:?}"),
910                    })?;
911            } else {
912                return InvalidRegionRequestSnafu {
913                    region_id: self.region_id,
914                    err: format!("column {} not found", set_default.name),
915                }
916                .fail();
917            }
918        }
919        Ok(())
920    }
921}
922
923/// Fields skipped in serialization.
924struct SkippedFields {
925    /// Last schema.
926    schema: SchemaRef,
927    /// Id of the time index column.
928    time_index: ColumnId,
929    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
930    id_to_index: HashMap<ColumnId, usize>,
931}
932
933impl SkippedFields {
934    /// Constructs skipped fields from `column_metadatas`.
935    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
936        let column_schemas = column_metadatas
937            .iter()
938            .map(|column_metadata| column_metadata.column_schema.clone())
939            .collect();
940        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
941        let time_index = column_metadatas
942            .iter()
943            .find_map(|col| {
944                if col.semantic_type == SemanticType::Timestamp {
945                    Some(col.column_id)
946                } else {
947                    None
948                }
949            })
950            .context(InvalidMetaSnafu {
951                reason: "time index not found",
952            })?;
953        let id_to_index = column_metadatas
954            .iter()
955            .enumerate()
956            .map(|(idx, col)| (col.column_id, idx))
957            .collect();
958
959        Ok(SkippedFields {
960            schema,
961            time_index,
962            id_to_index,
963        })
964    }
965}
966
967#[derive(Snafu)]
968#[snafu(visibility(pub))]
969#[stack_trace_debug]
970pub enum MetadataError {
971    #[snafu(display("Invalid schema"))]
972    InvalidSchema {
973        source: datatypes::error::Error,
974        #[snafu(implicit)]
975        location: Location,
976    },
977
978    #[snafu(display("Invalid metadata, {}", reason))]
979    InvalidMeta {
980        reason: String,
981        #[snafu(implicit)]
982        location: Location,
983    },
984
985    #[snafu(display("Failed to ser/de json object"))]
986    SerdeJson {
987        #[snafu(implicit)]
988        location: Location,
989        #[snafu(source)]
990        error: serde_json::Error,
991    },
992
993    #[snafu(display("Invalid raw region request, err: {}", err))]
994    InvalidRawRegionRequest {
995        err: String,
996        #[snafu(implicit)]
997        location: Location,
998    },
999
1000    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1001    InvalidRegionRequest {
1002        region_id: RegionId,
1003        err: String,
1004        #[snafu(implicit)]
1005        location: Location,
1006    },
1007
1008    #[snafu(display("Unexpected schema error during project"))]
1009    SchemaProject {
1010        origin_schema: SchemaRef,
1011        projection: Vec<ColumnId>,
1012        #[snafu(implicit)]
1013        location: Location,
1014        source: datatypes::Error,
1015    },
1016
1017    #[snafu(display("Time index column not found"))]
1018    TimeIndexNotFound {
1019        #[snafu(implicit)]
1020        location: Location,
1021    },
1022
1023    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1024    ChangeColumnNotFound {
1025        column_name: String,
1026        region_id: RegionId,
1027        #[snafu(implicit)]
1028        location: Location,
1029    },
1030
1031    #[snafu(display("Failed to convert column schema"))]
1032    ConvertColumnSchema {
1033        source: api::error::Error,
1034        #[snafu(implicit)]
1035        location: Location,
1036    },
1037
1038    #[snafu(display("Failed to convert TimeRanges"))]
1039    ConvertTimeRanges {
1040        source: api::error::Error,
1041        #[snafu(implicit)]
1042        location: Location,
1043    },
1044
1045    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1046    InvalidSetRegionOptionRequest {
1047        key: String,
1048        value: String,
1049        #[snafu(implicit)]
1050        location: Location,
1051    },
1052
1053    #[snafu(display("Invalid set region option request, key: {}", key))]
1054    InvalidUnsetRegionOptionRequest {
1055        key: String,
1056        #[snafu(implicit)]
1057        location: Location,
1058    },
1059
1060    #[snafu(display("Failed to decode protobuf"))]
1061    DecodeProto {
1062        #[snafu(source)]
1063        error: prost::UnknownEnumValue,
1064        #[snafu(implicit)]
1065        location: Location,
1066    },
1067
1068    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1069    InvalidColumnOption {
1070        column_name: String,
1071        msg: String,
1072        #[snafu(implicit)]
1073        location: Location,
1074    },
1075
1076    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1077    SetFulltextOptions {
1078        column_name: String,
1079        source: datatypes::Error,
1080        #[snafu(implicit)]
1081        location: Location,
1082    },
1083
1084    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1085    GetFulltextOptions {
1086        column_name: String,
1087        source: datatypes::Error,
1088        #[snafu(implicit)]
1089        location: Location,
1090    },
1091
1092    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1093    SetSkippingIndexOptions {
1094        column_name: String,
1095        source: datatypes::Error,
1096        #[snafu(implicit)]
1097        location: Location,
1098    },
1099
1100    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1101    UnsetSkippingIndexOptions {
1102        column_name: String,
1103        source: datatypes::Error,
1104        #[snafu(implicit)]
1105        location: Location,
1106    },
1107
1108    #[snafu(display("Failed to decode arrow ipc record batches"))]
1109    DecodeArrowIpc {
1110        #[snafu(source)]
1111        error: arrow::error::ArrowError,
1112        #[snafu(implicit)]
1113        location: Location,
1114    },
1115
1116    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1117    CastDefaultValue {
1118        reason: String,
1119        source: datatypes::Error,
1120        #[snafu(implicit)]
1121        location: Location,
1122    },
1123
1124    #[snafu(display("Unexpected: {}", reason))]
1125    Unexpected {
1126        reason: String,
1127        #[snafu(implicit)]
1128        location: Location,
1129    },
1130
1131    #[snafu(display("Failed to encode/decode flight message"))]
1132    FlightCodec {
1133        source: common_grpc::Error,
1134        #[snafu(implicit)]
1135        location: Location,
1136    },
1137
1138    #[snafu(display("Invalid index option"))]
1139    InvalidIndexOption {
1140        #[snafu(implicit)]
1141        location: Location,
1142        #[snafu(source)]
1143        error: datatypes::error::Error,
1144    },
1145
1146    #[snafu(display("Sql common error"))]
1147    SqlCommon {
1148        source: common_sql::error::Error,
1149        #[snafu(implicit)]
1150        location: Location,
1151    },
1152}
1153
1154impl ErrorExt for MetadataError {
1155    fn status_code(&self) -> StatusCode {
1156        match self {
1157            Self::SqlCommon { source, .. } => source.status_code(),
1158            _ => StatusCode::InvalidArguments,
1159        }
1160    }
1161
1162    fn as_any(&self) -> &dyn Any {
1163        self
1164    }
1165}
1166
1167/// Set column fulltext options if it passed the validation.
1168///
1169/// Options allowed to modify:
1170/// * backend
1171///
1172/// Options not allowed to modify:
1173/// * analyzer
1174/// * case_sensitive
1175fn set_column_fulltext_options(
1176    column_meta: &mut ColumnMetadata,
1177    column_name: &str,
1178    options: &FulltextOptions,
1179    current_options: Option<FulltextOptions>,
1180) -> Result<()> {
1181    if let Some(current_options) = current_options {
1182        ensure!(
1183            current_options.analyzer == options.analyzer
1184                && current_options.case_sensitive == options.case_sensitive,
1185            InvalidColumnOptionSnafu {
1186                column_name,
1187                msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1188                current_options.analyzer, current_options.case_sensitive),
1189            }
1190        );
1191    }
1192
1193    column_meta
1194        .column_schema
1195        .set_fulltext_options(options)
1196        .context(SetFulltextOptionsSnafu { column_name })?;
1197
1198    Ok(())
1199}
1200
1201fn unset_column_fulltext_options(
1202    column_meta: &mut ColumnMetadata,
1203    column_name: &str,
1204    current_options: Option<FulltextOptions>,
1205) -> Result<()> {
1206    if let Some(mut current_options) = current_options
1207        && current_options.enable
1208    {
1209        current_options.enable = false;
1210        column_meta
1211            .column_schema
1212            .set_fulltext_options(&current_options)
1213            .context(SetFulltextOptionsSnafu { column_name })?;
1214    } else {
1215        return InvalidColumnOptionSnafu {
1216            column_name,
1217            msg: "FULLTEXT index already disabled",
1218        }
1219        .fail();
1220    }
1221
1222    Ok(())
1223}
1224
1225#[cfg(test)]
1226mod test {
1227    use datatypes::prelude::ConcreteDataType;
1228    use datatypes::schema::{
1229        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1230    };
1231    use datatypes::value::Value;
1232
1233    use super::*;
1234
1235    fn create_builder() -> RegionMetadataBuilder {
1236        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1237    }
1238
1239    fn build_test_region_metadata() -> RegionMetadata {
1240        let mut builder = create_builder();
1241        builder
1242            .push_column_metadata(ColumnMetadata {
1243                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1244                semantic_type: SemanticType::Tag,
1245                column_id: 1,
1246            })
1247            .push_column_metadata(ColumnMetadata {
1248                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1249                semantic_type: SemanticType::Field,
1250                column_id: 2,
1251            })
1252            .push_column_metadata(ColumnMetadata {
1253                column_schema: ColumnSchema::new(
1254                    "c",
1255                    ConcreteDataType::timestamp_millisecond_datatype(),
1256                    false,
1257                ),
1258                semantic_type: SemanticType::Timestamp,
1259                column_id: 3,
1260            })
1261            .primary_key(vec![1]);
1262        builder.build().unwrap()
1263    }
1264
1265    #[test]
1266    fn test_region_metadata() {
1267        let region_metadata = build_test_region_metadata();
1268        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1269        assert_eq!(
1270            "a",
1271            region_metadata.column_by_id(1).unwrap().column_schema.name
1272        );
1273        assert_eq!(None, region_metadata.column_by_id(10));
1274    }
1275
1276    #[test]
1277    fn test_region_metadata_serde() {
1278        let region_metadata = build_test_region_metadata();
1279        let serialized = serde_json::to_string(&region_metadata).unwrap();
1280        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1281        assert_eq!(region_metadata, deserialized);
1282    }
1283
1284    #[test]
1285    fn test_column_metadata_validate() {
1286        let mut builder = create_builder();
1287        let col = ColumnMetadata {
1288            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1289            semantic_type: SemanticType::Timestamp,
1290            column_id: 1,
1291        };
1292
1293        builder.push_column_metadata(col);
1294        let err = builder.build().unwrap_err();
1295        assert!(
1296            err.to_string()
1297                .contains("column `ts` is not timestamp type"),
1298            "unexpected err: {err}",
1299        );
1300    }
1301
1302    #[test]
1303    fn test_empty_region_metadata() {
1304        let builder = create_builder();
1305        let err = builder.build().unwrap_err();
1306        // A region must have a time index.
1307        assert!(
1308            err.to_string().contains("time index not found"),
1309            "unexpected err: {err}",
1310        );
1311    }
1312
1313    #[test]
1314    fn test_same_column_id() {
1315        let mut builder = create_builder();
1316        builder
1317            .push_column_metadata(ColumnMetadata {
1318                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1319                semantic_type: SemanticType::Tag,
1320                column_id: 1,
1321            })
1322            .push_column_metadata(ColumnMetadata {
1323                column_schema: ColumnSchema::new(
1324                    "b",
1325                    ConcreteDataType::timestamp_millisecond_datatype(),
1326                    false,
1327                ),
1328                semantic_type: SemanticType::Timestamp,
1329                column_id: 1,
1330            });
1331        let err = builder.build().unwrap_err();
1332        assert!(
1333            err.to_string()
1334                .contains("column a and b have the same column id"),
1335            "unexpected err: {err}",
1336        );
1337    }
1338
1339    #[test]
1340    fn test_duplicate_time_index() {
1341        let mut builder = create_builder();
1342        builder
1343            .push_column_metadata(ColumnMetadata {
1344                column_schema: ColumnSchema::new(
1345                    "a",
1346                    ConcreteDataType::timestamp_millisecond_datatype(),
1347                    false,
1348                ),
1349                semantic_type: SemanticType::Timestamp,
1350                column_id: 1,
1351            })
1352            .push_column_metadata(ColumnMetadata {
1353                column_schema: ColumnSchema::new(
1354                    "b",
1355                    ConcreteDataType::timestamp_millisecond_datatype(),
1356                    false,
1357                ),
1358                semantic_type: SemanticType::Timestamp,
1359                column_id: 2,
1360            });
1361        let err = builder.build().unwrap_err();
1362        assert!(
1363            err.to_string().contains("expect only one time index"),
1364            "unexpected err: {err}",
1365        );
1366    }
1367
1368    #[test]
1369    fn test_unknown_primary_key() {
1370        let mut builder = create_builder();
1371        builder
1372            .push_column_metadata(ColumnMetadata {
1373                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1374                semantic_type: SemanticType::Tag,
1375                column_id: 1,
1376            })
1377            .push_column_metadata(ColumnMetadata {
1378                column_schema: ColumnSchema::new(
1379                    "b",
1380                    ConcreteDataType::timestamp_millisecond_datatype(),
1381                    false,
1382                ),
1383                semantic_type: SemanticType::Timestamp,
1384                column_id: 2,
1385            })
1386            .primary_key(vec![3]);
1387        let err = builder.build().unwrap_err();
1388        assert!(
1389            err.to_string().contains("unknown column id 3"),
1390            "unexpected err: {err}",
1391        );
1392    }
1393
1394    #[test]
1395    fn test_same_primary_key() {
1396        let mut builder = create_builder();
1397        builder
1398            .push_column_metadata(ColumnMetadata {
1399                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1400                semantic_type: SemanticType::Tag,
1401                column_id: 1,
1402            })
1403            .push_column_metadata(ColumnMetadata {
1404                column_schema: ColumnSchema::new(
1405                    "b",
1406                    ConcreteDataType::timestamp_millisecond_datatype(),
1407                    false,
1408                ),
1409                semantic_type: SemanticType::Timestamp,
1410                column_id: 2,
1411            })
1412            .primary_key(vec![1, 1]);
1413        let err = builder.build().unwrap_err();
1414        assert!(
1415            err.to_string()
1416                .contains("duplicate column a in primary key"),
1417            "unexpected err: {err}",
1418        );
1419    }
1420
1421    #[test]
1422    fn test_in_time_index() {
1423        let mut builder = create_builder();
1424        builder
1425            .push_column_metadata(ColumnMetadata {
1426                column_schema: ColumnSchema::new(
1427                    "ts",
1428                    ConcreteDataType::timestamp_millisecond_datatype(),
1429                    false,
1430                ),
1431                semantic_type: SemanticType::Timestamp,
1432                column_id: 1,
1433            })
1434            .primary_key(vec![1]);
1435        let err = builder.build().unwrap_err();
1436        assert!(
1437            err.to_string()
1438                .contains("column ts is already a time index column"),
1439            "unexpected err: {err}",
1440        );
1441    }
1442
1443    #[test]
1444    fn test_nullable_time_index() {
1445        let mut builder = create_builder();
1446        builder.push_column_metadata(ColumnMetadata {
1447            column_schema: ColumnSchema::new(
1448                "ts",
1449                ConcreteDataType::timestamp_millisecond_datatype(),
1450                true,
1451            ),
1452            semantic_type: SemanticType::Timestamp,
1453            column_id: 1,
1454        });
1455        let err = builder.build().unwrap_err();
1456        assert!(
1457            err.to_string()
1458                .contains("time index column ts must be NOT NULL"),
1459            "unexpected err: {err}",
1460        );
1461    }
1462
1463    #[test]
1464    fn test_primary_key_semantic_type() {
1465        let mut builder = create_builder();
1466        builder
1467            .push_column_metadata(ColumnMetadata {
1468                column_schema: ColumnSchema::new(
1469                    "ts",
1470                    ConcreteDataType::timestamp_millisecond_datatype(),
1471                    false,
1472                ),
1473                semantic_type: SemanticType::Timestamp,
1474                column_id: 1,
1475            })
1476            .push_column_metadata(ColumnMetadata {
1477                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1478                semantic_type: SemanticType::Field,
1479                column_id: 2,
1480            })
1481            .primary_key(vec![2]);
1482        let err = builder.build().unwrap_err();
1483        assert!(
1484            err.to_string()
1485                .contains("semantic type of column a should be Tag, not Field"),
1486            "unexpected err: {err}",
1487        );
1488    }
1489
1490    #[test]
1491    fn test_primary_key_tag_num() {
1492        let mut builder = create_builder();
1493        builder
1494            .push_column_metadata(ColumnMetadata {
1495                column_schema: ColumnSchema::new(
1496                    "ts",
1497                    ConcreteDataType::timestamp_millisecond_datatype(),
1498                    false,
1499                ),
1500                semantic_type: SemanticType::Timestamp,
1501                column_id: 1,
1502            })
1503            .push_column_metadata(ColumnMetadata {
1504                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1505                semantic_type: SemanticType::Tag,
1506                column_id: 2,
1507            })
1508            .push_column_metadata(ColumnMetadata {
1509                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1510                semantic_type: SemanticType::Tag,
1511                column_id: 3,
1512            })
1513            .primary_key(vec![2]);
1514        let err = builder.build().unwrap_err();
1515        assert!(
1516            err.to_string()
1517                .contains("number of primary key columns 1 not equal to tag columns 2"),
1518            "unexpected err: {err}",
1519        );
1520    }
1521
1522    #[test]
1523    fn test_bump_version() {
1524        let mut region_metadata = build_test_region_metadata();
1525        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1526        builder.bump_version();
1527        let new_meta = builder.build().unwrap();
1528        region_metadata.schema_version += 1;
1529        assert_eq!(region_metadata, new_meta);
1530    }
1531
1532    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1533        let semantic_type = if is_tag {
1534            SemanticType::Tag
1535        } else {
1536            SemanticType::Field
1537        };
1538        ColumnMetadata {
1539            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1540            semantic_type,
1541            column_id,
1542        }
1543    }
1544
1545    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1546        let actual: Vec<_> = metadata
1547            .column_metadatas
1548            .iter()
1549            .map(|col| &col.column_schema.name)
1550            .collect();
1551        assert_eq!(names, actual);
1552    }
1553
1554    fn get_columns_default_constraint(
1555        metadata: &RegionMetadata,
1556        name: String,
1557    ) -> Option<Option<&ColumnDefaultConstraint>> {
1558        metadata.column_metadatas.iter().find_map(|col| {
1559            if col.column_schema.name == name {
1560                Some(col.column_schema.default_constraint())
1561            } else {
1562                None
1563            }
1564        })
1565    }
1566
1567    #[test]
1568    fn test_alter() {
1569        // a (tag), b (field), c (ts)
1570        let metadata = build_test_region_metadata();
1571        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1572        // tag d
1573        builder
1574            .alter(AlterKind::AddColumns {
1575                columns: vec![AddColumn {
1576                    column_metadata: new_column_metadata("d", true, 4),
1577                    location: None,
1578                }],
1579            })
1580            .unwrap();
1581        let metadata = builder.build().unwrap();
1582        check_columns(&metadata, &["a", "b", "c", "d"]);
1583        assert_eq!([1, 4], &metadata.primary_key[..]);
1584
1585        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1586        builder
1587            .alter(AlterKind::AddColumns {
1588                columns: vec![AddColumn {
1589                    column_metadata: new_column_metadata("e", false, 5),
1590                    location: Some(AddColumnLocation::First),
1591                }],
1592            })
1593            .unwrap();
1594        let metadata = builder.build().unwrap();
1595        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1596
1597        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1598        builder
1599            .alter(AlterKind::AddColumns {
1600                columns: vec![AddColumn {
1601                    column_metadata: new_column_metadata("f", false, 6),
1602                    location: Some(AddColumnLocation::After {
1603                        column_name: "b".to_string(),
1604                    }),
1605                }],
1606            })
1607            .unwrap();
1608        let metadata = builder.build().unwrap();
1609        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1610
1611        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1612        builder
1613            .alter(AlterKind::AddColumns {
1614                columns: vec![AddColumn {
1615                    column_metadata: new_column_metadata("g", false, 7),
1616                    location: Some(AddColumnLocation::After {
1617                        column_name: "d".to_string(),
1618                    }),
1619                }],
1620            })
1621            .unwrap();
1622        let metadata = builder.build().unwrap();
1623        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1624
1625        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1626        builder
1627            .alter(AlterKind::DropColumns {
1628                names: vec!["g".to_string(), "e".to_string()],
1629            })
1630            .unwrap();
1631        let metadata = builder.build().unwrap();
1632        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1633
1634        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1635        builder
1636            .alter(AlterKind::DropColumns {
1637                names: vec!["a".to_string()],
1638            })
1639            .unwrap();
1640        // Build returns error as the primary key contains a.
1641        let err = builder.build().unwrap_err();
1642        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1643
1644        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1645        let mut column_metadata = new_column_metadata("g", false, 8);
1646        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1647        column_metadata.column_schema = column_metadata
1648            .column_schema
1649            .with_default_constraint(default_constraint.clone())
1650            .unwrap();
1651        builder
1652            .alter(AlterKind::AddColumns {
1653                columns: vec![AddColumn {
1654                    column_metadata,
1655                    location: None,
1656                }],
1657            })
1658            .unwrap();
1659        let metadata = builder.build().unwrap();
1660        assert_eq!(
1661            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1662            default_constraint.as_ref()
1663        );
1664        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1665
1666        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1667        builder
1668            .alter(AlterKind::DropDefaults {
1669                names: vec!["g".to_string()],
1670            })
1671            .unwrap();
1672        let metadata = builder.build().unwrap();
1673        assert_eq!(
1674            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1675            None
1676        );
1677        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1678
1679        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1680        builder
1681            .alter(AlterKind::DropColumns {
1682                names: vec!["g".to_string()],
1683            })
1684            .unwrap();
1685        let metadata = builder.build().unwrap();
1686        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1687
1688        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1689        builder
1690            .alter(AlterKind::ModifyColumnTypes {
1691                columns: vec![ModifyColumnType {
1692                    column_name: "b".to_string(),
1693                    target_type: ConcreteDataType::string_datatype(),
1694                }],
1695            })
1696            .unwrap();
1697        let metadata = builder.build().unwrap();
1698        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1699        let b_type = &metadata
1700            .column_by_name("b")
1701            .unwrap()
1702            .column_schema
1703            .data_type;
1704        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1705
1706        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1707        builder
1708            .alter(AlterKind::SetIndexes {
1709                options: vec![SetIndexOption::Fulltext {
1710                    column_name: "b".to_string(),
1711                    options: FulltextOptions::new_unchecked(
1712                        true,
1713                        FulltextAnalyzer::Chinese,
1714                        true,
1715                        FulltextBackend::Bloom,
1716                        1000,
1717                        0.01,
1718                    ),
1719                }],
1720            })
1721            .unwrap();
1722        let metadata = builder.build().unwrap();
1723        let a_fulltext_options = metadata
1724            .column_by_name("b")
1725            .unwrap()
1726            .column_schema
1727            .fulltext_options()
1728            .unwrap()
1729            .unwrap();
1730        assert!(a_fulltext_options.enable);
1731        assert_eq!(
1732            datatypes::schema::FulltextAnalyzer::Chinese,
1733            a_fulltext_options.analyzer
1734        );
1735        assert!(a_fulltext_options.case_sensitive);
1736
1737        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1738        builder
1739            .alter(AlterKind::UnsetIndexes {
1740                options: vec![UnsetIndexOption::Fulltext {
1741                    column_name: "b".to_string(),
1742                }],
1743            })
1744            .unwrap();
1745        let metadata = builder.build().unwrap();
1746        let a_fulltext_options = metadata
1747            .column_by_name("b")
1748            .unwrap()
1749            .column_schema
1750            .fulltext_options()
1751            .unwrap()
1752            .unwrap();
1753        assert!(!a_fulltext_options.enable);
1754        assert_eq!(
1755            datatypes::schema::FulltextAnalyzer::Chinese,
1756            a_fulltext_options.analyzer
1757        );
1758        assert!(a_fulltext_options.case_sensitive);
1759    }
1760
1761    #[test]
1762    fn test_add_if_not_exists() {
1763        // a (tag), b (field), c (ts)
1764        let metadata = build_test_region_metadata();
1765        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1766        // tag d
1767        builder
1768            .alter(AlterKind::AddColumns {
1769                columns: vec![
1770                    AddColumn {
1771                        column_metadata: new_column_metadata("d", true, 4),
1772                        location: None,
1773                    },
1774                    AddColumn {
1775                        column_metadata: new_column_metadata("d", true, 4),
1776                        location: None,
1777                    },
1778                ],
1779            })
1780            .unwrap();
1781        let metadata = builder.build().unwrap();
1782        check_columns(&metadata, &["a", "b", "c", "d"]);
1783        assert_eq!([1, 4], &metadata.primary_key[..]);
1784
1785        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1786        // field b.
1787        builder
1788            .alter(AlterKind::AddColumns {
1789                columns: vec![AddColumn {
1790                    column_metadata: new_column_metadata("b", false, 2),
1791                    location: None,
1792                }],
1793            })
1794            .unwrap();
1795        let metadata = builder.build().unwrap();
1796        check_columns(&metadata, &["a", "b", "c", "d"]);
1797    }
1798
1799    #[test]
1800    fn test_add_column_with_inverted_index() {
1801        // only set inverted index to true explicitly will this column be inverted indexed
1802
1803        // a (tag), b (field), c (ts)
1804        let metadata = build_test_region_metadata();
1805        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1806        // tag d, e
1807        let mut col = new_column_metadata("d", true, 4);
1808        col.column_schema.set_inverted_index(true);
1809        builder
1810            .alter(AlterKind::AddColumns {
1811                columns: vec![
1812                    AddColumn {
1813                        column_metadata: col,
1814                        location: None,
1815                    },
1816                    AddColumn {
1817                        column_metadata: new_column_metadata("e", true, 5),
1818                        location: None,
1819                    },
1820                ],
1821            })
1822            .unwrap();
1823        let metadata = builder.build().unwrap();
1824        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1825        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1826        let column_metadata = metadata.column_by_name("a").unwrap();
1827        assert!(!column_metadata.column_schema.is_inverted_indexed());
1828        let column_metadata = metadata.column_by_name("b").unwrap();
1829        assert!(!column_metadata.column_schema.is_inverted_indexed());
1830        let column_metadata = metadata.column_by_name("c").unwrap();
1831        assert!(!column_metadata.column_schema.is_inverted_indexed());
1832        let column_metadata = metadata.column_by_name("d").unwrap();
1833        assert!(column_metadata.column_schema.is_inverted_indexed());
1834        let column_metadata = metadata.column_by_name("e").unwrap();
1835        assert!(!column_metadata.column_schema.is_inverted_indexed());
1836    }
1837
1838    #[test]
1839    fn test_drop_if_exists() {
1840        // a (tag), b (field), c (ts)
1841        let metadata = build_test_region_metadata();
1842        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1843        // field d, e
1844        builder
1845            .alter(AlterKind::AddColumns {
1846                columns: vec![
1847                    AddColumn {
1848                        column_metadata: new_column_metadata("d", false, 4),
1849                        location: None,
1850                    },
1851                    AddColumn {
1852                        column_metadata: new_column_metadata("e", false, 5),
1853                        location: None,
1854                    },
1855                ],
1856            })
1857            .unwrap();
1858        let metadata = builder.build().unwrap();
1859        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1860
1861        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1862        builder
1863            .alter(AlterKind::DropColumns {
1864                names: vec!["b".to_string(), "b".to_string()],
1865            })
1866            .unwrap();
1867        let metadata = builder.build().unwrap();
1868        check_columns(&metadata, &["a", "c", "d", "e"]);
1869
1870        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1871        builder
1872            .alter(AlterKind::DropColumns {
1873                names: vec!["b".to_string(), "e".to_string()],
1874            })
1875            .unwrap();
1876        let metadata = builder.build().unwrap();
1877        check_columns(&metadata, &["a", "c", "d"]);
1878    }
1879
1880    #[test]
1881    fn test_invalid_column_name() {
1882        let mut builder = create_builder();
1883        builder.push_column_metadata(ColumnMetadata {
1884            column_schema: ColumnSchema::new(
1885                "__sequence",
1886                ConcreteDataType::timestamp_millisecond_datatype(),
1887                false,
1888            ),
1889            semantic_type: SemanticType::Timestamp,
1890            column_id: 1,
1891        });
1892        let err = builder.build().unwrap_err();
1893        assert!(
1894            err.to_string()
1895                .contains("internal column name that can not be used"),
1896            "unexpected err: {err}",
1897        );
1898    }
1899
1900    #[test]
1901    fn test_debug_for_column_metadata() {
1902        let region_metadata = build_test_region_metadata();
1903        let formatted = format!("{:?}", region_metadata);
1904        assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1905    }
1906
1907    #[test]
1908    fn test_region_metadata_deserialize_default_primary_key_encoding() {
1909        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1910        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1911        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1912
1913        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1914        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1915        assert_eq!(
1916            deserialized.primary_key_encoding,
1917            PrimaryKeyEncoding::Sparse
1918        );
1919    }
1920}