1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51 pub column_schema: ColumnSchema,
53 pub semantic_type: SemanticType,
55 pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(
62 f,
63 "[{:?} {:?} {:?}]",
64 self.column_schema, self.semantic_type, self.column_id,
65 )
66 }
67}
68
69impl ColumnMetadata {
70 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72 let column_id = column_def.column_id;
73 let column_def = column_def
74 .column_def
75 .context(InvalidRawRegionRequestSnafu {
76 err: "column_def is absent",
77 })?;
78 let semantic_type = column_def.semantic_type();
79 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81 Ok(Self {
82 column_schema,
83 semantic_type,
84 column_id,
85 })
86 }
87
88 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90 serde_json::to_vec(columns)
91 }
92
93 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95 serde_json::from_slice(bytes)
96 }
97
98 pub fn is_same_datatype(&self, other: &Self) -> bool {
99 self.column_schema.data_type == other.column_schema.data_type
100 }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129 #[serde(skip)]
131 pub schema: SchemaRef,
132
133 #[serde(skip)]
137 time_index: ColumnId,
138 #[serde(skip)]
140 id_to_index: HashMap<ColumnId, usize>,
141
142 pub column_metadatas: Vec<ColumnMetadata>,
145 pub primary_key: Vec<ColumnId>,
147
148 pub region_id: RegionId,
150 pub schema_version: u64,
154
155 pub primary_key_encoding: PrimaryKeyEncoding,
157
158 pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 f.debug_struct("RegionMetadata")
168 .field("column_metadatas", &self.column_metadatas)
169 .field("time_index", &self.time_index)
170 .field("primary_key", &self.primary_key)
171 .field("region_id", &self.region_id)
172 .field("schema_version", &self.schema_version)
173 .field("partition_expr", &self.partition_expr)
174 .finish()
175 }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182 where
183 D: Deserializer<'de>,
184 {
185 #[derive(Deserialize)]
187 struct RegionMetadataWithoutSchema {
188 column_metadatas: Vec<ColumnMetadata>,
189 primary_key: Vec<ColumnId>,
190 region_id: RegionId,
191 schema_version: u64,
192 #[serde(default)]
193 primary_key_encoding: PrimaryKeyEncoding,
194 #[serde(default)]
195 partition_expr: Option<String>,
196 }
197
198 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199 let skipped =
200 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202 Ok(Self {
203 schema: skipped.schema,
204 time_index: skipped.time_index,
205 id_to_index: skipped.id_to_index,
206 column_metadatas: without_schema.column_metadatas,
207 primary_key: without_schema.primary_key,
208 region_id: without_schema.region_id,
209 schema_version: without_schema.schema_version,
210 primary_key_encoding: without_schema.primary_key_encoding,
211 partition_expr: without_schema.partition_expr,
212 })
213 }
214}
215
216impl RegionMetadata {
217 pub fn from_json(s: &str) -> Result<Self> {
219 serde_json::from_str(s).context(SerdeJsonSnafu)
220 }
221
222 pub fn to_json(&self) -> Result<String> {
224 serde_json::to_string(&self).context(SerdeJsonSnafu)
225 }
226
227 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229 self.id_to_index
230 .get(&column_id)
231 .map(|index| &self.column_metadatas[*index])
232 }
233
234 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236 self.id_to_index.get(&column_id).copied()
237 }
238
239 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241 self.column_metadatas
242 .iter()
243 .position(|col| col.column_schema.name == column_name)
244 }
245
246 pub fn time_index_column(&self) -> &ColumnMetadata {
251 let index = self.id_to_index[&self.time_index];
252 &self.column_metadatas[index]
253 }
254
255 pub fn time_index_type(&self) -> TimestampType {
260 let index = self.id_to_index[&self.time_index];
261 self.column_metadatas[index]
262 .column_schema
263 .data_type
264 .as_timestamp()
265 .unwrap()
266 }
267
268 pub fn time_index_column_pos(&self) -> usize {
270 self.id_to_index[&self.time_index]
271 }
272
273 pub fn time_index_field(&self) -> FieldRef {
275 let index = self.id_to_index[&self.time_index];
276 self.schema.arrow_schema().fields[index].clone()
277 }
278
279 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281 self.schema
282 .column_index_by_name(name)
283 .map(|index| &self.column_metadatas[index])
284 }
285
286 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288 self.primary_key
290 .iter()
291 .map(|id| self.column_by_id(*id).unwrap())
292 }
293
294 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299 self.column_metadatas
300 .iter()
301 .filter(|column| column.semantic_type == SemanticType::Field)
302 }
303
304 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308 self.primary_key.iter().position(|id| *id == column_id)
309 }
310
311 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315 ensure!(
317 projection.contains(&self.time_index),
318 TimeIndexNotFoundSnafu
319 );
320
321 let indices_to_preserve = projection
323 .iter()
324 .map(|id| {
325 self.column_index_by_id(*id)
326 .with_context(|| InvalidRegionRequestSnafu {
327 region_id: self.region_id,
328 err: format!("column id {} not found", id),
329 })
330 })
331 .collect::<Result<Vec<_>>>()?;
332
333 let projected_schema =
335 self.schema
336 .try_project(&indices_to_preserve)
337 .with_context(|_| SchemaProjectSnafu {
338 origin_schema: self.schema.clone(),
339 projection: projection.to_vec(),
340 })?;
341
342 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344 let mut projected_primary_key = vec![];
345 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346 for index in indices_to_preserve {
347 let col = self.column_metadatas[index].clone();
348 if col.semantic_type == SemanticType::Tag {
349 projected_primary_key.push(col.column_id);
350 }
351 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352 projected_column_metadatas.push(col);
353 }
354
355 Ok(RegionMetadata {
356 schema: Arc::new(projected_schema),
357 time_index: self.time_index,
358 id_to_index: projected_id_to_index,
359 column_metadatas: projected_column_metadatas,
360 primary_key: projected_primary_key,
361 region_id: self.region_id,
362 schema_version: self.schema_version,
363 primary_key_encoding: self.primary_key_encoding,
364 partition_expr: self.partition_expr.clone(),
365 })
366 }
367
368 pub fn inverted_indexed_column_ids<'a>(
370 &self,
371 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372 ) -> HashSet<ColumnId> {
373 let mut inverted_index = self
374 .column_metadatas
375 .iter()
376 .filter(|column| column.column_schema.is_inverted_indexed())
377 .map(|column| column.column_id)
378 .collect::<HashSet<_>>();
379
380 for ignored in ignore_column_ids {
381 inverted_index.remove(ignored);
382 }
383
384 inverted_index
385 }
386
387 fn validate(&self) -> Result<()> {
389 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
391 for col in &self.column_metadatas {
392 Self::validate_column_metadata(col)?;
394
395 ensure!(
398 !id_names.contains_key(&col.column_id),
399 InvalidMetaSnafu {
400 reason: format!(
401 "column {} and {} have the same column id {}",
402 id_names[&col.column_id], col.column_schema.name, col.column_id,
403 ),
404 }
405 );
406 id_names.insert(col.column_id, &col.column_schema.name);
407 }
408
409 let time_indexes = self
411 .column_metadatas
412 .iter()
413 .filter(|col| col.semantic_type == SemanticType::Timestamp)
414 .collect::<Vec<_>>();
415 ensure!(
416 time_indexes.len() == 1,
417 InvalidMetaSnafu {
418 reason: format!(
419 "expect only one time index, found {}: {}",
420 time_indexes.len(),
421 time_indexes
422 .iter()
423 .map(|c| &c.column_schema.name)
424 .join(", ")
425 ),
426 }
427 );
428
429 ensure!(
431 !self.time_index_column().column_schema.is_nullable(),
432 InvalidMetaSnafu {
433 reason: format!(
434 "time index column {} must be NOT NULL",
435 self.time_index_column().column_schema.name
436 ),
437 }
438 );
439
440 if !self.primary_key.is_empty() {
441 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
442 for column_id in &self.primary_key {
444 ensure!(
446 id_names.contains_key(column_id),
447 InvalidMetaSnafu {
448 reason: format!("unknown column id {}", column_id),
449 }
450 );
451
452 let column = self.column_by_id(*column_id).unwrap();
454 ensure!(
456 !pk_ids.contains(&column_id),
457 InvalidMetaSnafu {
458 reason: format!(
459 "duplicate column {} in primary key",
460 column.column_schema.name
461 ),
462 }
463 );
464
465 ensure!(
467 *column_id != self.time_index,
468 InvalidMetaSnafu {
469 reason: format!(
470 "column {} is already a time index column",
471 column.column_schema.name,
472 ),
473 }
474 );
475
476 ensure!(
478 column.semantic_type == SemanticType::Tag,
479 InvalidMetaSnafu {
480 reason: format!(
481 "semantic type of column {} should be Tag, not {:?}",
482 column.column_schema.name, column.semantic_type
483 ),
484 }
485 );
486
487 pk_ids.insert(column_id);
488 }
489 }
490
491 let num_tag = self
493 .column_metadatas
494 .iter()
495 .filter(|col| col.semantic_type == SemanticType::Tag)
496 .count();
497 ensure!(
498 num_tag == self.primary_key.len(),
499 InvalidMetaSnafu {
500 reason: format!(
501 "number of primary key columns {} not equal to tag columns {}",
502 self.primary_key.len(),
503 num_tag
504 ),
505 }
506 );
507
508 Ok(())
509 }
510
511 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
513 if column_metadata.semantic_type == SemanticType::Timestamp {
514 ensure!(
515 column_metadata.column_schema.data_type.is_timestamp(),
516 InvalidMetaSnafu {
517 reason: format!(
518 "column `{}` is not timestamp type",
519 column_metadata.column_schema.name
520 ),
521 }
522 );
523 }
524
525 ensure!(
526 !is_internal_column(&column_metadata.column_schema.name),
527 InvalidMetaSnafu {
528 reason: format!(
529 "{} is internal column name that can not be used",
530 column_metadata.column_schema.name
531 ),
532 }
533 );
534
535 Ok(())
536 }
537}
538
539pub struct RegionMetadataBuilder {
541 region_id: RegionId,
542 column_metadatas: Vec<ColumnMetadata>,
543 primary_key: Vec<ColumnId>,
544 schema_version: u64,
545 primary_key_encoding: PrimaryKeyEncoding,
546 partition_expr: Option<String>,
547}
548
549impl RegionMetadataBuilder {
550 pub fn new(id: RegionId) -> Self {
552 Self {
553 region_id: id,
554 column_metadatas: vec![],
555 primary_key: vec![],
556 schema_version: 0,
557 primary_key_encoding: PrimaryKeyEncoding::Dense,
558 partition_expr: None,
559 }
560 }
561
562 pub fn from_existing(existing: RegionMetadata) -> Self {
564 Self {
565 column_metadatas: existing.column_metadatas,
566 primary_key: existing.primary_key,
567 region_id: existing.region_id,
568 schema_version: existing.schema_version,
569 primary_key_encoding: existing.primary_key_encoding,
570 partition_expr: existing.partition_expr,
571 }
572 }
573
574 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
576 self.primary_key_encoding = encoding;
577 self
578 }
579
580 pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
582 self.partition_expr = expr_json;
583 self
584 }
585
586 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
588 self.column_metadatas.push(column_metadata);
589 self
590 }
591
592 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
594 self.primary_key = key;
595 self
596 }
597
598 pub fn bump_version(&mut self) -> &mut Self {
600 self.schema_version += 1;
601 self
602 }
603
604 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
608 match kind {
609 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
610 AlterKind::DropColumns { names } => self.drop_columns(&names),
611 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
612 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
613 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
614 AlterKind::SetRegionOptions { options: _ } => {
615 }
617 AlterKind::UnsetRegionOptions { keys: _ } => {
618 }
620 AlterKind::DropDefaults { names } => {
621 self.drop_defaults(names)?;
622 }
623 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
624 AlterKind::SyncColumns { column_metadatas } => {
625 self.primary_key = column_metadatas
626 .iter()
627 .filter_map(|column_metadata| {
628 if column_metadata.semantic_type == SemanticType::Tag {
629 Some(column_metadata.column_id)
630 } else {
631 None
632 }
633 })
634 .collect::<Vec<_>>();
635 self.column_metadatas = column_metadatas;
636 }
637 }
638 Ok(self)
639 }
640
641 pub fn build(self) -> Result<RegionMetadata> {
643 let skipped = SkippedFields::new(&self.column_metadatas)?;
644
645 let meta = RegionMetadata {
646 schema: skipped.schema,
647 time_index: skipped.time_index,
648 id_to_index: skipped.id_to_index,
649 column_metadatas: self.column_metadatas,
650 primary_key: self.primary_key,
651 region_id: self.region_id,
652 schema_version: self.schema_version,
653 primary_key_encoding: self.primary_key_encoding,
654 partition_expr: self.partition_expr,
655 };
656
657 meta.validate()?;
658
659 Ok(meta)
660 }
661
662 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
664 let mut names: HashSet<_> = self
665 .column_metadatas
666 .iter()
667 .map(|col| col.column_schema.name.clone())
668 .collect();
669
670 for add_column in columns {
671 if names.contains(&add_column.column_metadata.column_schema.name) {
672 continue;
674 }
675
676 let column_id = add_column.column_metadata.column_id;
677 let semantic_type = add_column.column_metadata.semantic_type;
678 let column_name = add_column.column_metadata.column_schema.name.clone();
679 match add_column.location {
680 None => {
681 self.column_metadatas.push(add_column.column_metadata);
682 }
683 Some(AddColumnLocation::First) => {
684 self.column_metadatas.insert(0, add_column.column_metadata);
685 }
686 Some(AddColumnLocation::After { column_name }) => {
687 let pos = self
688 .column_metadatas
689 .iter()
690 .position(|col| col.column_schema.name == column_name)
691 .context(InvalidRegionRequestSnafu {
692 region_id: self.region_id,
693 err: format!(
694 "column {} not found, failed to add column {} after it",
695 column_name, add_column.column_metadata.column_schema.name
696 ),
697 })?;
698 self.column_metadatas
700 .insert(pos + 1, add_column.column_metadata);
701 }
702 }
703 names.insert(column_name);
704 if semantic_type == SemanticType::Tag {
705 self.primary_key.push(column_id);
707 }
708 }
709
710 Ok(())
711 }
712
713 fn drop_columns(&mut self, names: &[String]) {
715 let name_set: HashSet<_> = names.iter().collect();
716 self.column_metadatas
717 .retain(|col| !name_set.contains(&col.column_schema.name));
718 }
719
720 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
722 let mut change_type_map: HashMap<_, _> = columns
723 .into_iter()
724 .map(
725 |ModifyColumnType {
726 column_name,
727 target_type,
728 }| (column_name, target_type),
729 )
730 .collect();
731
732 for column_meta in self.column_metadatas.iter_mut() {
733 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
734 column_meta.column_schema.data_type = target_type.clone();
735 let new_default =
737 if let Some(default_value) = column_meta.column_schema.default_constraint() {
738 Some(
739 default_value
740 .cast_to_datatype(&target_type)
741 .with_context(|_| CastDefaultValueSnafu {
742 reason: format!(
743 "Failed to cast default value from {:?} to type {:?}",
744 default_value, target_type
745 ),
746 })?,
747 )
748 } else {
749 None
750 };
751 column_meta.column_schema = column_meta
752 .column_schema
753 .clone()
754 .with_default_constraint(new_default.clone())
755 .with_context(|_| CastDefaultValueSnafu {
756 reason: format!("Failed to set new default: {:?}", new_default),
757 })?;
758 }
759 }
760
761 Ok(())
762 }
763
764 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
765 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
766 for option in &options {
767 set_index_map
768 .entry(option.column_name())
769 .or_default()
770 .push(option);
771 }
772
773 for column_metadata in self.column_metadatas.iter_mut() {
774 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
775 for option in options {
776 Self::set_index(column_metadata, option)?;
777 }
778 }
779 }
780
781 Ok(())
782 }
783
784 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
785 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
786 for option in &options {
787 unset_index_map
788 .entry(option.column_name())
789 .or_default()
790 .push(option);
791 }
792
793 for column_metadata in self.column_metadatas.iter_mut() {
794 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
795 for option in options {
796 Self::unset_index(column_metadata, option)?;
797 }
798 }
799 }
800
801 Ok(())
802 }
803
804 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
805 match options {
806 SetIndexOption::Fulltext {
807 column_name,
808 options,
809 } => {
810 ensure!(
811 column_metadata.column_schema.data_type.is_string(),
812 InvalidColumnOptionSnafu {
813 column_name,
814 msg: "FULLTEXT index only supports string type".to_string(),
815 }
816 );
817 let current_fulltext_options = column_metadata
818 .column_schema
819 .fulltext_options()
820 .with_context(|_| GetFulltextOptionsSnafu {
821 column_name: column_name.to_string(),
822 })?;
823 set_column_fulltext_options(
824 column_metadata,
825 column_name,
826 options,
827 current_fulltext_options,
828 )?;
829 }
830 SetIndexOption::Inverted { .. } => {
831 column_metadata.column_schema.set_inverted_index(true)
832 }
833 SetIndexOption::Skipping {
834 column_name,
835 options,
836 } => {
837 column_metadata
838 .column_schema
839 .set_skipping_options(options)
840 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
841 }
842 }
843
844 Ok(())
845 }
846
847 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
848 match options {
849 UnsetIndexOption::Fulltext { column_name } => {
850 ensure!(
851 column_metadata.column_schema.data_type.is_string(),
852 InvalidColumnOptionSnafu {
853 column_name,
854 msg: "FULLTEXT index only supports string type".to_string(),
855 }
856 );
857
858 let current_fulltext_options = column_metadata
859 .column_schema
860 .fulltext_options()
861 .with_context(|_| GetFulltextOptionsSnafu {
862 column_name: column_name.to_string(),
863 })?;
864
865 unset_column_fulltext_options(
866 column_metadata,
867 column_name,
868 current_fulltext_options,
869 )?;
870 }
871 UnsetIndexOption::Inverted { .. } => {
872 column_metadata.column_schema.set_inverted_index(false)
873 }
874 UnsetIndexOption::Skipping { column_name } => {
875 column_metadata
876 .column_schema
877 .unset_skipping_options()
878 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
879 }
880 }
881
882 Ok(())
883 }
884
885 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
886 for name in column_names.iter() {
887 let meta = self
888 .column_metadatas
889 .iter_mut()
890 .find(|col| col.column_schema.name == *name);
891 if let Some(meta) = meta {
892 if !meta.column_schema.is_nullable() {
893 return InvalidRegionRequestSnafu {
894 region_id: self.region_id,
895 err: format!(
896 "column {name} is not nullable and `default` cannot be dropped",
897 ),
898 }
899 .fail();
900 }
901 meta.column_schema = meta
902 .column_schema
903 .clone()
904 .with_default_constraint(None)
905 .with_context(|_| CastDefaultValueSnafu {
906 reason: format!("Failed to drop default : {name:?}"),
907 })?;
908 } else {
909 return InvalidRegionRequestSnafu {
910 region_id: self.region_id,
911 err: format!("column {name} not found",),
912 }
913 .fail();
914 }
915 }
916 Ok(())
917 }
918
919 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
920 for set_default in set_defaults.iter() {
921 let meta = self
922 .column_metadatas
923 .iter_mut()
924 .find(|col| col.column_schema.name == set_default.name);
925 if let Some(meta) = meta {
926 let default_constraint = common_sql::convert::deserialize_default_constraint(
927 set_default.default_constraint.as_slice(),
928 &meta.column_schema.name,
929 &meta.column_schema.data_type,
930 )
931 .context(SqlCommonSnafu)?;
932
933 meta.column_schema = meta
934 .column_schema
935 .clone()
936 .with_default_constraint(default_constraint)
937 .with_context(|_| CastDefaultValueSnafu {
938 reason: format!("Failed to set default : {set_default:?}"),
939 })?;
940 } else {
941 return InvalidRegionRequestSnafu {
942 region_id: self.region_id,
943 err: format!("column {} not found", set_default.name),
944 }
945 .fail();
946 }
947 }
948 Ok(())
949 }
950}
951
952struct SkippedFields {
954 schema: SchemaRef,
956 time_index: ColumnId,
958 id_to_index: HashMap<ColumnId, usize>,
960}
961
962impl SkippedFields {
963 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
965 let column_schemas = column_metadatas
966 .iter()
967 .map(|column_metadata| column_metadata.column_schema.clone())
968 .collect();
969 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
970 let time_index = column_metadatas
971 .iter()
972 .find_map(|col| {
973 if col.semantic_type == SemanticType::Timestamp {
974 Some(col.column_id)
975 } else {
976 None
977 }
978 })
979 .context(InvalidMetaSnafu {
980 reason: "time index not found",
981 })?;
982 let id_to_index = column_metadatas
983 .iter()
984 .enumerate()
985 .map(|(idx, col)| (col.column_id, idx))
986 .collect();
987
988 Ok(SkippedFields {
989 schema,
990 time_index,
991 id_to_index,
992 })
993 }
994}
995
996#[derive(Snafu)]
997#[snafu(visibility(pub))]
998#[stack_trace_debug]
999pub enum MetadataError {
1000 #[snafu(display("Invalid schema"))]
1001 InvalidSchema {
1002 source: datatypes::error::Error,
1003 #[snafu(implicit)]
1004 location: Location,
1005 },
1006
1007 #[snafu(display("Invalid metadata, {}", reason))]
1008 InvalidMeta {
1009 reason: String,
1010 #[snafu(implicit)]
1011 location: Location,
1012 },
1013
1014 #[snafu(display("Failed to ser/de json object"))]
1015 SerdeJson {
1016 #[snafu(implicit)]
1017 location: Location,
1018 #[snafu(source)]
1019 error: serde_json::Error,
1020 },
1021
1022 #[snafu(display("Invalid raw region request, err: {}", err))]
1023 InvalidRawRegionRequest {
1024 err: String,
1025 #[snafu(implicit)]
1026 location: Location,
1027 },
1028
1029 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1030 InvalidRegionRequest {
1031 region_id: RegionId,
1032 err: String,
1033 #[snafu(implicit)]
1034 location: Location,
1035 },
1036
1037 #[snafu(display("Unexpected schema error during project"))]
1038 SchemaProject {
1039 origin_schema: SchemaRef,
1040 projection: Vec<ColumnId>,
1041 #[snafu(implicit)]
1042 location: Location,
1043 source: datatypes::Error,
1044 },
1045
1046 #[snafu(display("Time index column not found"))]
1047 TimeIndexNotFound {
1048 #[snafu(implicit)]
1049 location: Location,
1050 },
1051
1052 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1053 ChangeColumnNotFound {
1054 column_name: String,
1055 region_id: RegionId,
1056 #[snafu(implicit)]
1057 location: Location,
1058 },
1059
1060 #[snafu(display("Failed to convert column schema"))]
1061 ConvertColumnSchema {
1062 source: api::error::Error,
1063 #[snafu(implicit)]
1064 location: Location,
1065 },
1066
1067 #[snafu(display("Failed to convert TimeRanges"))]
1068 ConvertTimeRanges {
1069 source: api::error::Error,
1070 #[snafu(implicit)]
1071 location: Location,
1072 },
1073
1074 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1075 InvalidSetRegionOptionRequest {
1076 key: String,
1077 value: String,
1078 #[snafu(implicit)]
1079 location: Location,
1080 },
1081
1082 #[snafu(display("Invalid set region option request, key: {}", key))]
1083 InvalidUnsetRegionOptionRequest {
1084 key: String,
1085 #[snafu(implicit)]
1086 location: Location,
1087 },
1088
1089 #[snafu(display("Failed to decode protobuf"))]
1090 DecodeProto {
1091 #[snafu(source)]
1092 error: prost::UnknownEnumValue,
1093 #[snafu(implicit)]
1094 location: Location,
1095 },
1096
1097 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1098 InvalidColumnOption {
1099 column_name: String,
1100 msg: String,
1101 #[snafu(implicit)]
1102 location: Location,
1103 },
1104
1105 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1106 SetFulltextOptions {
1107 column_name: String,
1108 source: datatypes::Error,
1109 #[snafu(implicit)]
1110 location: Location,
1111 },
1112
1113 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1114 GetFulltextOptions {
1115 column_name: String,
1116 source: datatypes::Error,
1117 #[snafu(implicit)]
1118 location: Location,
1119 },
1120
1121 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1122 SetSkippingIndexOptions {
1123 column_name: String,
1124 source: datatypes::Error,
1125 #[snafu(implicit)]
1126 location: Location,
1127 },
1128
1129 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1130 UnsetSkippingIndexOptions {
1131 column_name: String,
1132 source: datatypes::Error,
1133 #[snafu(implicit)]
1134 location: Location,
1135 },
1136
1137 #[snafu(display("Failed to decode arrow ipc record batches"))]
1138 DecodeArrowIpc {
1139 #[snafu(source)]
1140 error: arrow::error::ArrowError,
1141 #[snafu(implicit)]
1142 location: Location,
1143 },
1144
1145 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1146 CastDefaultValue {
1147 reason: String,
1148 source: datatypes::Error,
1149 #[snafu(implicit)]
1150 location: Location,
1151 },
1152
1153 #[snafu(display("Unexpected: {}", reason))]
1154 Unexpected {
1155 reason: String,
1156 #[snafu(implicit)]
1157 location: Location,
1158 },
1159
1160 #[snafu(display("Failed to encode/decode flight message"))]
1161 FlightCodec {
1162 source: common_grpc::Error,
1163 #[snafu(implicit)]
1164 location: Location,
1165 },
1166
1167 #[snafu(display("Invalid index option"))]
1168 InvalidIndexOption {
1169 #[snafu(implicit)]
1170 location: Location,
1171 #[snafu(source)]
1172 error: datatypes::error::Error,
1173 },
1174
1175 #[snafu(display("Sql common error"))]
1176 SqlCommon {
1177 source: common_sql::error::Error,
1178 #[snafu(implicit)]
1179 location: Location,
1180 },
1181}
1182
1183impl ErrorExt for MetadataError {
1184 fn status_code(&self) -> StatusCode {
1185 match self {
1186 Self::SqlCommon { source, .. } => source.status_code(),
1187 _ => StatusCode::InvalidArguments,
1188 }
1189 }
1190
1191 fn as_any(&self) -> &dyn Any {
1192 self
1193 }
1194}
1195
1196fn set_column_fulltext_options(
1205 column_meta: &mut ColumnMetadata,
1206 column_name: &str,
1207 options: &FulltextOptions,
1208 current_options: Option<FulltextOptions>,
1209) -> Result<()> {
1210 if let Some(current_options) = current_options {
1211 ensure!(
1212 current_options.analyzer == options.analyzer
1213 && current_options.case_sensitive == options.case_sensitive,
1214 InvalidColumnOptionSnafu {
1215 column_name,
1216 msg: format!(
1217 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1218 current_options.analyzer, current_options.case_sensitive
1219 ),
1220 }
1221 );
1222 }
1223
1224 column_meta
1225 .column_schema
1226 .set_fulltext_options(options)
1227 .context(SetFulltextOptionsSnafu { column_name })?;
1228
1229 Ok(())
1230}
1231
1232fn unset_column_fulltext_options(
1233 column_meta: &mut ColumnMetadata,
1234 column_name: &str,
1235 current_options: Option<FulltextOptions>,
1236) -> Result<()> {
1237 if let Some(mut current_options) = current_options
1238 && current_options.enable
1239 {
1240 current_options.enable = false;
1241 column_meta
1242 .column_schema
1243 .set_fulltext_options(¤t_options)
1244 .context(SetFulltextOptionsSnafu { column_name })?;
1245 } else {
1246 return InvalidColumnOptionSnafu {
1247 column_name,
1248 msg: "FULLTEXT index already disabled",
1249 }
1250 .fail();
1251 }
1252
1253 Ok(())
1254}
1255
1256#[cfg(test)]
1257mod test {
1258 use datatypes::prelude::ConcreteDataType;
1259 use datatypes::schema::{
1260 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1261 };
1262 use datatypes::value::Value;
1263
1264 use super::*;
1265
1266 fn create_builder() -> RegionMetadataBuilder {
1267 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1268 }
1269
1270 fn build_test_region_metadata() -> RegionMetadata {
1271 let mut builder = create_builder();
1272 builder
1273 .push_column_metadata(ColumnMetadata {
1274 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1275 semantic_type: SemanticType::Tag,
1276 column_id: 1,
1277 })
1278 .push_column_metadata(ColumnMetadata {
1279 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1280 semantic_type: SemanticType::Field,
1281 column_id: 2,
1282 })
1283 .push_column_metadata(ColumnMetadata {
1284 column_schema: ColumnSchema::new(
1285 "c",
1286 ConcreteDataType::timestamp_millisecond_datatype(),
1287 false,
1288 ),
1289 semantic_type: SemanticType::Timestamp,
1290 column_id: 3,
1291 })
1292 .primary_key(vec![1])
1293 .partition_expr_json(Some("".to_string()));
1294 builder.build().unwrap()
1295 }
1296
1297 #[test]
1298 fn test_region_metadata() {
1299 let region_metadata = build_test_region_metadata();
1300 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1301 assert_eq!(
1302 "a",
1303 region_metadata.column_by_id(1).unwrap().column_schema.name
1304 );
1305 assert_eq!(None, region_metadata.column_by_id(10));
1306 }
1307
1308 #[test]
1309 fn test_region_metadata_serde() {
1310 let region_metadata = build_test_region_metadata();
1311 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1312 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1313 assert_eq!(region_metadata, deserialized);
1314 }
1315
1316 #[test]
1317 fn test_column_metadata_validate() {
1318 let mut builder = create_builder();
1319 let col = ColumnMetadata {
1320 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1321 semantic_type: SemanticType::Timestamp,
1322 column_id: 1,
1323 };
1324
1325 builder.push_column_metadata(col);
1326 let err = builder.build().unwrap_err();
1327 assert!(
1328 err.to_string()
1329 .contains("column `ts` is not timestamp type"),
1330 "unexpected err: {err}",
1331 );
1332 }
1333
1334 #[test]
1335 fn test_empty_region_metadata() {
1336 let builder = create_builder();
1337 let err = builder.build().unwrap_err();
1338 assert!(
1340 err.to_string().contains("time index not found"),
1341 "unexpected err: {err}",
1342 );
1343 }
1344
1345 #[test]
1346 fn test_same_column_id() {
1347 let mut builder = create_builder();
1348 builder
1349 .push_column_metadata(ColumnMetadata {
1350 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1351 semantic_type: SemanticType::Tag,
1352 column_id: 1,
1353 })
1354 .push_column_metadata(ColumnMetadata {
1355 column_schema: ColumnSchema::new(
1356 "b",
1357 ConcreteDataType::timestamp_millisecond_datatype(),
1358 false,
1359 ),
1360 semantic_type: SemanticType::Timestamp,
1361 column_id: 1,
1362 });
1363 let err = builder.build().unwrap_err();
1364 assert!(
1365 err.to_string()
1366 .contains("column a and b have the same column id"),
1367 "unexpected err: {err}",
1368 );
1369 }
1370
1371 #[test]
1372 fn test_duplicate_time_index() {
1373 let mut builder = create_builder();
1374 builder
1375 .push_column_metadata(ColumnMetadata {
1376 column_schema: ColumnSchema::new(
1377 "a",
1378 ConcreteDataType::timestamp_millisecond_datatype(),
1379 false,
1380 ),
1381 semantic_type: SemanticType::Timestamp,
1382 column_id: 1,
1383 })
1384 .push_column_metadata(ColumnMetadata {
1385 column_schema: ColumnSchema::new(
1386 "b",
1387 ConcreteDataType::timestamp_millisecond_datatype(),
1388 false,
1389 ),
1390 semantic_type: SemanticType::Timestamp,
1391 column_id: 2,
1392 });
1393 let err = builder.build().unwrap_err();
1394 assert!(
1395 err.to_string().contains("expect only one time index"),
1396 "unexpected err: {err}",
1397 );
1398 }
1399
1400 #[test]
1401 fn test_unknown_primary_key() {
1402 let mut builder = create_builder();
1403 builder
1404 .push_column_metadata(ColumnMetadata {
1405 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1406 semantic_type: SemanticType::Tag,
1407 column_id: 1,
1408 })
1409 .push_column_metadata(ColumnMetadata {
1410 column_schema: ColumnSchema::new(
1411 "b",
1412 ConcreteDataType::timestamp_millisecond_datatype(),
1413 false,
1414 ),
1415 semantic_type: SemanticType::Timestamp,
1416 column_id: 2,
1417 })
1418 .primary_key(vec![3]);
1419 let err = builder.build().unwrap_err();
1420 assert!(
1421 err.to_string().contains("unknown column id 3"),
1422 "unexpected err: {err}",
1423 );
1424 }
1425
1426 #[test]
1427 fn test_same_primary_key() {
1428 let mut builder = create_builder();
1429 builder
1430 .push_column_metadata(ColumnMetadata {
1431 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1432 semantic_type: SemanticType::Tag,
1433 column_id: 1,
1434 })
1435 .push_column_metadata(ColumnMetadata {
1436 column_schema: ColumnSchema::new(
1437 "b",
1438 ConcreteDataType::timestamp_millisecond_datatype(),
1439 false,
1440 ),
1441 semantic_type: SemanticType::Timestamp,
1442 column_id: 2,
1443 })
1444 .primary_key(vec![1, 1]);
1445 let err = builder.build().unwrap_err();
1446 assert!(
1447 err.to_string()
1448 .contains("duplicate column a in primary key"),
1449 "unexpected err: {err}",
1450 );
1451 }
1452
1453 #[test]
1454 fn test_in_time_index() {
1455 let mut builder = create_builder();
1456 builder
1457 .push_column_metadata(ColumnMetadata {
1458 column_schema: ColumnSchema::new(
1459 "ts",
1460 ConcreteDataType::timestamp_millisecond_datatype(),
1461 false,
1462 ),
1463 semantic_type: SemanticType::Timestamp,
1464 column_id: 1,
1465 })
1466 .primary_key(vec![1]);
1467 let err = builder.build().unwrap_err();
1468 assert!(
1469 err.to_string()
1470 .contains("column ts is already a time index column"),
1471 "unexpected err: {err}",
1472 );
1473 }
1474
1475 #[test]
1476 fn test_nullable_time_index() {
1477 let mut builder = create_builder();
1478 builder.push_column_metadata(ColumnMetadata {
1479 column_schema: ColumnSchema::new(
1480 "ts",
1481 ConcreteDataType::timestamp_millisecond_datatype(),
1482 true,
1483 ),
1484 semantic_type: SemanticType::Timestamp,
1485 column_id: 1,
1486 });
1487 let err = builder.build().unwrap_err();
1488 assert!(
1489 err.to_string()
1490 .contains("time index column ts must be NOT NULL"),
1491 "unexpected err: {err}",
1492 );
1493 }
1494
1495 #[test]
1496 fn test_primary_key_semantic_type() {
1497 let mut builder = create_builder();
1498 builder
1499 .push_column_metadata(ColumnMetadata {
1500 column_schema: ColumnSchema::new(
1501 "ts",
1502 ConcreteDataType::timestamp_millisecond_datatype(),
1503 false,
1504 ),
1505 semantic_type: SemanticType::Timestamp,
1506 column_id: 1,
1507 })
1508 .push_column_metadata(ColumnMetadata {
1509 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1510 semantic_type: SemanticType::Field,
1511 column_id: 2,
1512 })
1513 .primary_key(vec![2]);
1514 let err = builder.build().unwrap_err();
1515 assert!(
1516 err.to_string()
1517 .contains("semantic type of column a should be Tag, not Field"),
1518 "unexpected err: {err}",
1519 );
1520 }
1521
1522 #[test]
1523 fn test_primary_key_tag_num() {
1524 let mut builder = create_builder();
1525 builder
1526 .push_column_metadata(ColumnMetadata {
1527 column_schema: ColumnSchema::new(
1528 "ts",
1529 ConcreteDataType::timestamp_millisecond_datatype(),
1530 false,
1531 ),
1532 semantic_type: SemanticType::Timestamp,
1533 column_id: 1,
1534 })
1535 .push_column_metadata(ColumnMetadata {
1536 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1537 semantic_type: SemanticType::Tag,
1538 column_id: 2,
1539 })
1540 .push_column_metadata(ColumnMetadata {
1541 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1542 semantic_type: SemanticType::Tag,
1543 column_id: 3,
1544 })
1545 .primary_key(vec![2]);
1546 let err = builder.build().unwrap_err();
1547 assert!(
1548 err.to_string()
1549 .contains("number of primary key columns 1 not equal to tag columns 2"),
1550 "unexpected err: {err}",
1551 );
1552 }
1553
1554 #[test]
1555 fn test_bump_version() {
1556 let mut region_metadata = build_test_region_metadata();
1557 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1558 builder.bump_version();
1559 let new_meta = builder.build().unwrap();
1560 region_metadata.schema_version += 1;
1561 assert_eq!(region_metadata, new_meta);
1562 }
1563
1564 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1565 let semantic_type = if is_tag {
1566 SemanticType::Tag
1567 } else {
1568 SemanticType::Field
1569 };
1570 ColumnMetadata {
1571 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1572 semantic_type,
1573 column_id,
1574 }
1575 }
1576
1577 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1578 let actual: Vec<_> = metadata
1579 .column_metadatas
1580 .iter()
1581 .map(|col| &col.column_schema.name)
1582 .collect();
1583 assert_eq!(names, actual);
1584 }
1585
1586 fn get_columns_default_constraint(
1587 metadata: &RegionMetadata,
1588 name: String,
1589 ) -> Option<Option<&ColumnDefaultConstraint>> {
1590 metadata.column_metadatas.iter().find_map(|col| {
1591 if col.column_schema.name == name {
1592 Some(col.column_schema.default_constraint())
1593 } else {
1594 None
1595 }
1596 })
1597 }
1598
1599 #[test]
1600 fn test_alter() {
1601 let metadata = build_test_region_metadata();
1603 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1604 builder
1606 .alter(AlterKind::AddColumns {
1607 columns: vec![AddColumn {
1608 column_metadata: new_column_metadata("d", true, 4),
1609 location: None,
1610 }],
1611 })
1612 .unwrap();
1613 let metadata = builder.build().unwrap();
1614 check_columns(&metadata, &["a", "b", "c", "d"]);
1615 assert_eq!([1, 4], &metadata.primary_key[..]);
1616
1617 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1618 builder
1619 .alter(AlterKind::AddColumns {
1620 columns: vec![AddColumn {
1621 column_metadata: new_column_metadata("e", false, 5),
1622 location: Some(AddColumnLocation::First),
1623 }],
1624 })
1625 .unwrap();
1626 let metadata = builder.build().unwrap();
1627 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1628
1629 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1630 builder
1631 .alter(AlterKind::AddColumns {
1632 columns: vec![AddColumn {
1633 column_metadata: new_column_metadata("f", false, 6),
1634 location: Some(AddColumnLocation::After {
1635 column_name: "b".to_string(),
1636 }),
1637 }],
1638 })
1639 .unwrap();
1640 let metadata = builder.build().unwrap();
1641 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1642
1643 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1644 builder
1645 .alter(AlterKind::AddColumns {
1646 columns: vec![AddColumn {
1647 column_metadata: new_column_metadata("g", false, 7),
1648 location: Some(AddColumnLocation::After {
1649 column_name: "d".to_string(),
1650 }),
1651 }],
1652 })
1653 .unwrap();
1654 let metadata = builder.build().unwrap();
1655 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1656
1657 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1658 builder
1659 .alter(AlterKind::DropColumns {
1660 names: vec!["g".to_string(), "e".to_string()],
1661 })
1662 .unwrap();
1663 let metadata = builder.build().unwrap();
1664 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1665
1666 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1667 builder
1668 .alter(AlterKind::DropColumns {
1669 names: vec!["a".to_string()],
1670 })
1671 .unwrap();
1672 let err = builder.build().unwrap_err();
1674 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1675
1676 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1677 let mut column_metadata = new_column_metadata("g", false, 8);
1678 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1679 column_metadata.column_schema = column_metadata
1680 .column_schema
1681 .with_default_constraint(default_constraint.clone())
1682 .unwrap();
1683 builder
1684 .alter(AlterKind::AddColumns {
1685 columns: vec![AddColumn {
1686 column_metadata,
1687 location: None,
1688 }],
1689 })
1690 .unwrap();
1691 let metadata = builder.build().unwrap();
1692 assert_eq!(
1693 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1694 default_constraint.as_ref()
1695 );
1696 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1697
1698 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1699 builder
1700 .alter(AlterKind::DropDefaults {
1701 names: vec!["g".to_string()],
1702 })
1703 .unwrap();
1704 let metadata = builder.build().unwrap();
1705 assert_eq!(
1706 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1707 None
1708 );
1709 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1710
1711 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1712 builder
1713 .alter(AlterKind::DropColumns {
1714 names: vec!["g".to_string()],
1715 })
1716 .unwrap();
1717 let metadata = builder.build().unwrap();
1718 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1719
1720 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1721 builder
1722 .alter(AlterKind::ModifyColumnTypes {
1723 columns: vec![ModifyColumnType {
1724 column_name: "b".to_string(),
1725 target_type: ConcreteDataType::string_datatype(),
1726 }],
1727 })
1728 .unwrap();
1729 let metadata = builder.build().unwrap();
1730 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1731 let b_type = &metadata
1732 .column_by_name("b")
1733 .unwrap()
1734 .column_schema
1735 .data_type;
1736 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1737
1738 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1739 builder
1740 .alter(AlterKind::SetIndexes {
1741 options: vec![SetIndexOption::Fulltext {
1742 column_name: "b".to_string(),
1743 options: FulltextOptions::new_unchecked(
1744 true,
1745 FulltextAnalyzer::Chinese,
1746 true,
1747 FulltextBackend::Bloom,
1748 1000,
1749 0.01,
1750 ),
1751 }],
1752 })
1753 .unwrap();
1754 let metadata = builder.build().unwrap();
1755 let a_fulltext_options = metadata
1756 .column_by_name("b")
1757 .unwrap()
1758 .column_schema
1759 .fulltext_options()
1760 .unwrap()
1761 .unwrap();
1762 assert!(a_fulltext_options.enable);
1763 assert_eq!(
1764 datatypes::schema::FulltextAnalyzer::Chinese,
1765 a_fulltext_options.analyzer
1766 );
1767 assert!(a_fulltext_options.case_sensitive);
1768
1769 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1770 builder
1771 .alter(AlterKind::UnsetIndexes {
1772 options: vec![UnsetIndexOption::Fulltext {
1773 column_name: "b".to_string(),
1774 }],
1775 })
1776 .unwrap();
1777 let metadata = builder.build().unwrap();
1778 let a_fulltext_options = metadata
1779 .column_by_name("b")
1780 .unwrap()
1781 .column_schema
1782 .fulltext_options()
1783 .unwrap()
1784 .unwrap();
1785 assert!(!a_fulltext_options.enable);
1786 assert_eq!(
1787 datatypes::schema::FulltextAnalyzer::Chinese,
1788 a_fulltext_options.analyzer
1789 );
1790 assert!(a_fulltext_options.case_sensitive);
1791 }
1792
1793 #[test]
1794 fn test_add_if_not_exists() {
1795 let metadata = build_test_region_metadata();
1797 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1798 builder
1800 .alter(AlterKind::AddColumns {
1801 columns: vec![
1802 AddColumn {
1803 column_metadata: new_column_metadata("d", true, 4),
1804 location: None,
1805 },
1806 AddColumn {
1807 column_metadata: new_column_metadata("d", true, 4),
1808 location: None,
1809 },
1810 ],
1811 })
1812 .unwrap();
1813 let metadata = builder.build().unwrap();
1814 check_columns(&metadata, &["a", "b", "c", "d"]);
1815 assert_eq!([1, 4], &metadata.primary_key[..]);
1816
1817 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1818 builder
1820 .alter(AlterKind::AddColumns {
1821 columns: vec![AddColumn {
1822 column_metadata: new_column_metadata("b", false, 2),
1823 location: None,
1824 }],
1825 })
1826 .unwrap();
1827 let metadata = builder.build().unwrap();
1828 check_columns(&metadata, &["a", "b", "c", "d"]);
1829 }
1830
1831 #[test]
1832 fn test_add_column_with_inverted_index() {
1833 let metadata = build_test_region_metadata();
1837 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1838 let mut col = new_column_metadata("d", true, 4);
1840 col.column_schema.set_inverted_index(true);
1841 builder
1842 .alter(AlterKind::AddColumns {
1843 columns: vec![
1844 AddColumn {
1845 column_metadata: col,
1846 location: None,
1847 },
1848 AddColumn {
1849 column_metadata: new_column_metadata("e", true, 5),
1850 location: None,
1851 },
1852 ],
1853 })
1854 .unwrap();
1855 let metadata = builder.build().unwrap();
1856 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1857 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1858 let column_metadata = metadata.column_by_name("a").unwrap();
1859 assert!(!column_metadata.column_schema.is_inverted_indexed());
1860 let column_metadata = metadata.column_by_name("b").unwrap();
1861 assert!(!column_metadata.column_schema.is_inverted_indexed());
1862 let column_metadata = metadata.column_by_name("c").unwrap();
1863 assert!(!column_metadata.column_schema.is_inverted_indexed());
1864 let column_metadata = metadata.column_by_name("d").unwrap();
1865 assert!(column_metadata.column_schema.is_inverted_indexed());
1866 let column_metadata = metadata.column_by_name("e").unwrap();
1867 assert!(!column_metadata.column_schema.is_inverted_indexed());
1868 }
1869
1870 #[test]
1871 fn test_drop_if_exists() {
1872 let metadata = build_test_region_metadata();
1874 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1875 builder
1877 .alter(AlterKind::AddColumns {
1878 columns: vec![
1879 AddColumn {
1880 column_metadata: new_column_metadata("d", false, 4),
1881 location: None,
1882 },
1883 AddColumn {
1884 column_metadata: new_column_metadata("e", false, 5),
1885 location: None,
1886 },
1887 ],
1888 })
1889 .unwrap();
1890 let metadata = builder.build().unwrap();
1891 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1892
1893 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1894 builder
1895 .alter(AlterKind::DropColumns {
1896 names: vec!["b".to_string(), "b".to_string()],
1897 })
1898 .unwrap();
1899 let metadata = builder.build().unwrap();
1900 check_columns(&metadata, &["a", "c", "d", "e"]);
1901
1902 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1903 builder
1904 .alter(AlterKind::DropColumns {
1905 names: vec!["b".to_string(), "e".to_string()],
1906 })
1907 .unwrap();
1908 let metadata = builder.build().unwrap();
1909 check_columns(&metadata, &["a", "c", "d"]);
1910 }
1911
1912 #[test]
1913 fn test_invalid_column_name() {
1914 let mut builder = create_builder();
1915 builder.push_column_metadata(ColumnMetadata {
1916 column_schema: ColumnSchema::new(
1917 "__sequence",
1918 ConcreteDataType::timestamp_millisecond_datatype(),
1919 false,
1920 ),
1921 semantic_type: SemanticType::Timestamp,
1922 column_id: 1,
1923 });
1924 let err = builder.build().unwrap_err();
1925 assert!(
1926 err.to_string()
1927 .contains("internal column name that can not be used"),
1928 "unexpected err: {err}",
1929 );
1930 }
1931
1932 #[test]
1933 fn test_debug_for_column_metadata() {
1934 let region_metadata = build_test_region_metadata();
1935 let formatted = format!("{:?}", region_metadata);
1936 assert_eq!(
1937 formatted,
1938 "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
1939 );
1940 }
1941
1942 #[test]
1943 fn test_region_metadata_deserialize_default_primary_key_encoding() {
1944 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1945 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1946 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1947
1948 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1949 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1950 assert_eq!(
1951 deserialized.primary_key_encoding,
1952 PrimaryKeyEncoding::Sparse
1953 );
1954 }
1955}