1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51 pub column_schema: ColumnSchema,
53 pub semantic_type: SemanticType,
55 pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(
62 f,
63 "[{:?} {:?} {:?}]",
64 self.column_schema, self.semantic_type, self.column_id,
65 )
66 }
67}
68
69impl ColumnMetadata {
70 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72 let column_id = column_def.column_id;
73 let column_def = column_def
74 .column_def
75 .context(InvalidRawRegionRequestSnafu {
76 err: "column_def is absent",
77 })?;
78 let semantic_type = column_def.semantic_type();
79 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81 Ok(Self {
82 column_schema,
83 semantic_type,
84 column_id,
85 })
86 }
87
88 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90 serde_json::to_vec(columns)
91 }
92
93 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95 serde_json::from_slice(bytes)
96 }
97
98 pub fn is_same_datatype(&self, other: &Self) -> bool {
99 self.column_schema.data_type == other.column_schema.data_type
100 }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129 #[serde(skip)]
131 pub schema: SchemaRef,
132
133 #[serde(skip)]
137 time_index: ColumnId,
138 #[serde(skip)]
140 id_to_index: HashMap<ColumnId, usize>,
141
142 pub column_metadatas: Vec<ColumnMetadata>,
145 pub primary_key: Vec<ColumnId>,
147
148 pub region_id: RegionId,
150 pub schema_version: u64,
154
155 pub primary_key_encoding: PrimaryKeyEncoding,
157
158 pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 f.debug_struct("RegionMetadata")
168 .field("column_metadatas", &self.column_metadatas)
169 .field("time_index", &self.time_index)
170 .field("primary_key", &self.primary_key)
171 .field("region_id", &self.region_id)
172 .field("schema_version", &self.schema_version)
173 .field("partition_expr", &self.partition_expr)
174 .finish()
175 }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182 where
183 D: Deserializer<'de>,
184 {
185 #[derive(Deserialize)]
187 struct RegionMetadataWithoutSchema {
188 column_metadatas: Vec<ColumnMetadata>,
189 primary_key: Vec<ColumnId>,
190 region_id: RegionId,
191 schema_version: u64,
192 #[serde(default)]
193 primary_key_encoding: PrimaryKeyEncoding,
194 #[serde(default)]
195 partition_expr: Option<String>,
196 }
197
198 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199 let skipped =
200 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202 Ok(Self {
203 schema: skipped.schema,
204 time_index: skipped.time_index,
205 id_to_index: skipped.id_to_index,
206 column_metadatas: without_schema.column_metadatas,
207 primary_key: without_schema.primary_key,
208 region_id: without_schema.region_id,
209 schema_version: without_schema.schema_version,
210 primary_key_encoding: without_schema.primary_key_encoding,
211 partition_expr: without_schema.partition_expr,
212 })
213 }
214}
215
216impl RegionMetadata {
217 pub fn from_json(s: &str) -> Result<Self> {
219 serde_json::from_str(s).context(SerdeJsonSnafu)
220 }
221
222 pub fn to_json(&self) -> Result<String> {
224 serde_json::to_string(&self).context(SerdeJsonSnafu)
225 }
226
227 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229 self.id_to_index
230 .get(&column_id)
231 .map(|index| &self.column_metadatas[*index])
232 }
233
234 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236 self.id_to_index.get(&column_id).copied()
237 }
238
239 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241 self.column_metadatas
242 .iter()
243 .position(|col| col.column_schema.name == column_name)
244 }
245
246 pub fn time_index_column(&self) -> &ColumnMetadata {
251 let index = self.id_to_index[&self.time_index];
252 &self.column_metadatas[index]
253 }
254
255 pub fn time_index_type(&self) -> TimestampType {
260 let index = self.id_to_index[&self.time_index];
261 self.column_metadatas[index]
262 .column_schema
263 .data_type
264 .as_timestamp()
265 .unwrap()
266 }
267
268 pub fn time_index_column_pos(&self) -> usize {
270 self.id_to_index[&self.time_index]
271 }
272
273 pub fn time_index_field(&self) -> FieldRef {
275 let index = self.id_to_index[&self.time_index];
276 self.schema.arrow_schema().fields[index].clone()
277 }
278
279 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281 self.schema
282 .column_index_by_name(name)
283 .map(|index| &self.column_metadatas[index])
284 }
285
286 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288 self.primary_key
290 .iter()
291 .map(|id| self.column_by_id(*id).unwrap())
292 }
293
294 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299 self.column_metadatas
300 .iter()
301 .filter(|column| column.semantic_type == SemanticType::Field)
302 }
303
304 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308 self.primary_key.iter().position(|id| *id == column_id)
309 }
310
311 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315 ensure!(
317 projection.contains(&self.time_index),
318 TimeIndexNotFoundSnafu
319 );
320
321 let indices_to_preserve = projection
323 .iter()
324 .map(|id| {
325 self.column_index_by_id(*id)
326 .with_context(|| InvalidRegionRequestSnafu {
327 region_id: self.region_id,
328 err: format!("column id {} not found", id),
329 })
330 })
331 .collect::<Result<Vec<_>>>()?;
332
333 let projected_schema =
335 self.schema
336 .try_project(&indices_to_preserve)
337 .with_context(|_| SchemaProjectSnafu {
338 origin_schema: self.schema.clone(),
339 projection: projection.to_vec(),
340 })?;
341
342 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344 let mut projected_primary_key = vec![];
345 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346 for index in indices_to_preserve {
347 let col = self.column_metadatas[index].clone();
348 if col.semantic_type == SemanticType::Tag {
349 projected_primary_key.push(col.column_id);
350 }
351 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352 projected_column_metadatas.push(col);
353 }
354
355 Ok(RegionMetadata {
356 schema: Arc::new(projected_schema),
357 time_index: self.time_index,
358 id_to_index: projected_id_to_index,
359 column_metadatas: projected_column_metadatas,
360 primary_key: projected_primary_key,
361 region_id: self.region_id,
362 schema_version: self.schema_version,
363 primary_key_encoding: self.primary_key_encoding,
364 partition_expr: self.partition_expr.clone(),
365 })
366 }
367
368 pub fn inverted_indexed_column_ids<'a>(
370 &self,
371 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372 ) -> HashSet<ColumnId> {
373 let mut inverted_index = self
374 .column_metadatas
375 .iter()
376 .filter(|column| column.column_schema.is_inverted_indexed())
377 .map(|column| column.column_id)
378 .collect::<HashSet<_>>();
379
380 for ignored in ignore_column_ids {
381 inverted_index.remove(ignored);
382 }
383
384 inverted_index
385 }
386
387 fn validate(&self) -> Result<()> {
389 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
391 for col in &self.column_metadatas {
392 Self::validate_column_metadata(col)?;
394
395 ensure!(
398 !id_names.contains_key(&col.column_id),
399 InvalidMetaSnafu {
400 reason: format!(
401 "column {} and {} have the same column id {}",
402 id_names[&col.column_id], col.column_schema.name, col.column_id,
403 ),
404 }
405 );
406 id_names.insert(col.column_id, &col.column_schema.name);
407 }
408
409 let time_indexes = self
411 .column_metadatas
412 .iter()
413 .filter(|col| col.semantic_type == SemanticType::Timestamp)
414 .collect::<Vec<_>>();
415 ensure!(
416 time_indexes.len() == 1,
417 InvalidMetaSnafu {
418 reason: format!(
419 "expect only one time index, found {}: {}",
420 time_indexes.len(),
421 time_indexes
422 .iter()
423 .map(|c| &c.column_schema.name)
424 .join(", ")
425 ),
426 }
427 );
428
429 ensure!(
431 !self.time_index_column().column_schema.is_nullable(),
432 InvalidMetaSnafu {
433 reason: format!(
434 "time index column {} must be NOT NULL",
435 self.time_index_column().column_schema.name
436 ),
437 }
438 );
439
440 if !self.primary_key.is_empty() {
441 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
442 for column_id in &self.primary_key {
444 ensure!(
446 id_names.contains_key(column_id),
447 InvalidMetaSnafu {
448 reason: format!("unknown column id {}", column_id),
449 }
450 );
451
452 let column = self.column_by_id(*column_id).unwrap();
454 ensure!(
456 !pk_ids.contains(&column_id),
457 InvalidMetaSnafu {
458 reason: format!(
459 "duplicate column {} in primary key",
460 column.column_schema.name
461 ),
462 }
463 );
464
465 ensure!(
467 *column_id != self.time_index,
468 InvalidMetaSnafu {
469 reason: format!(
470 "column {} is already a time index column",
471 column.column_schema.name,
472 ),
473 }
474 );
475
476 ensure!(
478 column.semantic_type == SemanticType::Tag,
479 InvalidMetaSnafu {
480 reason: format!(
481 "semantic type of column {} should be Tag, not {:?}",
482 column.column_schema.name, column.semantic_type
483 ),
484 }
485 );
486
487 pk_ids.insert(column_id);
488 }
489 }
490
491 let num_tag = self
493 .column_metadatas
494 .iter()
495 .filter(|col| col.semantic_type == SemanticType::Tag)
496 .count();
497 ensure!(
498 num_tag == self.primary_key.len(),
499 InvalidMetaSnafu {
500 reason: format!(
501 "number of primary key columns {} not equal to tag columns {}",
502 self.primary_key.len(),
503 num_tag
504 ),
505 }
506 );
507
508 Ok(())
509 }
510
511 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
513 if column_metadata.semantic_type == SemanticType::Timestamp {
514 ensure!(
515 column_metadata.column_schema.data_type.is_timestamp(),
516 InvalidMetaSnafu {
517 reason: format!(
518 "column `{}` is not timestamp type",
519 column_metadata.column_schema.name
520 ),
521 }
522 );
523 }
524
525 ensure!(
526 !is_internal_column(&column_metadata.column_schema.name),
527 InvalidMetaSnafu {
528 reason: format!(
529 "{} is internal column name that can not be used",
530 column_metadata.column_schema.name
531 ),
532 }
533 );
534
535 Ok(())
536 }
537}
538
539pub struct RegionMetadataBuilder {
541 region_id: RegionId,
542 column_metadatas: Vec<ColumnMetadata>,
543 primary_key: Vec<ColumnId>,
544 schema_version: u64,
545 primary_key_encoding: PrimaryKeyEncoding,
546 partition_expr: Option<String>,
547}
548
549impl RegionMetadataBuilder {
550 pub fn new(id: RegionId) -> Self {
552 Self {
553 region_id: id,
554 column_metadatas: vec![],
555 primary_key: vec![],
556 schema_version: 0,
557 primary_key_encoding: PrimaryKeyEncoding::Dense,
558 partition_expr: None,
559 }
560 }
561
562 pub fn from_existing(existing: RegionMetadata) -> Self {
564 Self {
565 column_metadatas: existing.column_metadatas,
566 primary_key: existing.primary_key,
567 region_id: existing.region_id,
568 schema_version: existing.schema_version,
569 primary_key_encoding: existing.primary_key_encoding,
570 partition_expr: existing.partition_expr,
571 }
572 }
573
574 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
576 self.primary_key_encoding = encoding;
577 self
578 }
579
580 pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
582 self.partition_expr = expr_json;
583 self
584 }
585
586 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
588 self.column_metadatas.push(column_metadata);
589 self
590 }
591
592 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
594 self.primary_key = key;
595 self
596 }
597
598 pub fn bump_version(&mut self) -> &mut Self {
600 self.schema_version += 1;
601 self
602 }
603
604 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
608 match kind {
609 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
610 AlterKind::DropColumns { names } => self.drop_columns(&names),
611 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
612 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
613 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
614 AlterKind::SetRegionOptions { options: _ } => {
615 }
617 AlterKind::UnsetRegionOptions { keys: _ } => {
618 }
620 AlterKind::DropDefaults { names } => {
621 self.drop_defaults(names)?;
622 }
623 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
624 AlterKind::SyncColumns { column_metadatas } => {
625 self.primary_key = column_metadatas
626 .iter()
627 .filter_map(|column_metadata| {
628 if column_metadata.semantic_type == SemanticType::Tag {
629 Some(column_metadata.column_id)
630 } else {
631 None
632 }
633 })
634 .collect::<Vec<_>>();
635 self.column_metadatas = column_metadatas;
636 }
637 }
638 Ok(self)
639 }
640
641 pub fn build(self) -> Result<RegionMetadata> {
643 self.build_with_options(true)
644 }
645
646 pub fn build_without_validation(self) -> Result<RegionMetadata> {
651 self.build_with_options(false)
652 }
653
654 fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
655 let skipped = SkippedFields::new(&self.column_metadatas)?;
656
657 let meta = RegionMetadata {
658 schema: skipped.schema,
659 time_index: skipped.time_index,
660 id_to_index: skipped.id_to_index,
661 column_metadatas: self.column_metadatas,
662 primary_key: self.primary_key,
663 region_id: self.region_id,
664 schema_version: self.schema_version,
665 primary_key_encoding: self.primary_key_encoding,
666 partition_expr: self.partition_expr,
667 };
668
669 if validate {
670 meta.validate()?;
671 }
672
673 Ok(meta)
674 }
675
676 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
678 let mut names: HashSet<_> = self
679 .column_metadatas
680 .iter()
681 .map(|col| col.column_schema.name.clone())
682 .collect();
683
684 for add_column in columns {
685 if names.contains(&add_column.column_metadata.column_schema.name) {
686 continue;
688 }
689
690 let column_id = add_column.column_metadata.column_id;
691 let semantic_type = add_column.column_metadata.semantic_type;
692 let column_name = add_column.column_metadata.column_schema.name.clone();
693 match add_column.location {
694 None => {
695 self.column_metadatas.push(add_column.column_metadata);
696 }
697 Some(AddColumnLocation::First) => {
698 self.column_metadatas.insert(0, add_column.column_metadata);
699 }
700 Some(AddColumnLocation::After { column_name }) => {
701 let pos = self
702 .column_metadatas
703 .iter()
704 .position(|col| col.column_schema.name == column_name)
705 .context(InvalidRegionRequestSnafu {
706 region_id: self.region_id,
707 err: format!(
708 "column {} not found, failed to add column {} after it",
709 column_name, add_column.column_metadata.column_schema.name
710 ),
711 })?;
712 self.column_metadatas
714 .insert(pos + 1, add_column.column_metadata);
715 }
716 }
717 names.insert(column_name);
718 if semantic_type == SemanticType::Tag {
719 self.primary_key.push(column_id);
721 }
722 }
723
724 Ok(())
725 }
726
727 fn drop_columns(&mut self, names: &[String]) {
729 let name_set: HashSet<_> = names.iter().collect();
730 self.column_metadatas
731 .retain(|col| !name_set.contains(&col.column_schema.name));
732 }
733
734 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
736 let mut change_type_map: HashMap<_, _> = columns
737 .into_iter()
738 .map(
739 |ModifyColumnType {
740 column_name,
741 target_type,
742 }| (column_name, target_type),
743 )
744 .collect();
745
746 for column_meta in self.column_metadatas.iter_mut() {
747 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
748 column_meta.column_schema.data_type = target_type.clone();
749 let new_default =
751 if let Some(default_value) = column_meta.column_schema.default_constraint() {
752 Some(
753 default_value
754 .cast_to_datatype(&target_type)
755 .with_context(|_| CastDefaultValueSnafu {
756 reason: format!(
757 "Failed to cast default value from {:?} to type {:?}",
758 default_value, target_type
759 ),
760 })?,
761 )
762 } else {
763 None
764 };
765 column_meta.column_schema = column_meta
766 .column_schema
767 .clone()
768 .with_default_constraint(new_default.clone())
769 .with_context(|_| CastDefaultValueSnafu {
770 reason: format!("Failed to set new default: {:?}", new_default),
771 })?;
772 }
773 }
774
775 Ok(())
776 }
777
778 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
779 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
780 for option in &options {
781 set_index_map
782 .entry(option.column_name())
783 .or_default()
784 .push(option);
785 }
786
787 for column_metadata in self.column_metadatas.iter_mut() {
788 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
789 for option in options {
790 Self::set_index(column_metadata, option)?;
791 }
792 }
793 }
794
795 Ok(())
796 }
797
798 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
799 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
800 for option in &options {
801 unset_index_map
802 .entry(option.column_name())
803 .or_default()
804 .push(option);
805 }
806
807 for column_metadata in self.column_metadatas.iter_mut() {
808 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
809 for option in options {
810 Self::unset_index(column_metadata, option)?;
811 }
812 }
813 }
814
815 Ok(())
816 }
817
818 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
819 match options {
820 SetIndexOption::Fulltext {
821 column_name,
822 options,
823 } => {
824 ensure!(
825 column_metadata.column_schema.data_type.is_string(),
826 InvalidColumnOptionSnafu {
827 column_name,
828 msg: "FULLTEXT index only supports string type".to_string(),
829 }
830 );
831 let current_fulltext_options = column_metadata
832 .column_schema
833 .fulltext_options()
834 .with_context(|_| GetFulltextOptionsSnafu {
835 column_name: column_name.clone(),
836 })?;
837 set_column_fulltext_options(
838 column_metadata,
839 column_name,
840 options,
841 current_fulltext_options,
842 )?;
843 }
844 SetIndexOption::Inverted { .. } => {
845 column_metadata.column_schema.set_inverted_index(true)
846 }
847 SetIndexOption::Skipping {
848 column_name,
849 options,
850 } => {
851 column_metadata
852 .column_schema
853 .set_skipping_options(options)
854 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
855 }
856 }
857
858 Ok(())
859 }
860
861 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
862 match options {
863 UnsetIndexOption::Fulltext { column_name } => {
864 ensure!(
865 column_metadata.column_schema.data_type.is_string(),
866 InvalidColumnOptionSnafu {
867 column_name,
868 msg: "FULLTEXT index only supports string type".to_string(),
869 }
870 );
871
872 let current_fulltext_options = column_metadata
873 .column_schema
874 .fulltext_options()
875 .with_context(|_| GetFulltextOptionsSnafu {
876 column_name: column_name.clone(),
877 })?;
878
879 unset_column_fulltext_options(
880 column_metadata,
881 column_name,
882 current_fulltext_options,
883 )?;
884 }
885 UnsetIndexOption::Inverted { .. } => {
886 column_metadata.column_schema.set_inverted_index(false)
887 }
888 UnsetIndexOption::Skipping { column_name } => {
889 column_metadata
890 .column_schema
891 .unset_skipping_options()
892 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
893 }
894 }
895
896 Ok(())
897 }
898
899 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
900 for name in column_names.iter() {
901 let meta = self
902 .column_metadatas
903 .iter_mut()
904 .find(|col| col.column_schema.name == *name);
905 if let Some(meta) = meta {
906 if !meta.column_schema.is_nullable() {
907 return InvalidRegionRequestSnafu {
908 region_id: self.region_id,
909 err: format!(
910 "column {name} is not nullable and `default` cannot be dropped",
911 ),
912 }
913 .fail();
914 }
915 meta.column_schema = meta
916 .column_schema
917 .clone()
918 .with_default_constraint(None)
919 .with_context(|_| CastDefaultValueSnafu {
920 reason: format!("Failed to drop default : {name:?}"),
921 })?;
922 } else {
923 return InvalidRegionRequestSnafu {
924 region_id: self.region_id,
925 err: format!("column {name} not found",),
926 }
927 .fail();
928 }
929 }
930 Ok(())
931 }
932
933 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
934 for set_default in set_defaults.iter() {
935 let meta = self
936 .column_metadatas
937 .iter_mut()
938 .find(|col| col.column_schema.name == set_default.name);
939 if let Some(meta) = meta {
940 let default_constraint = common_sql::convert::deserialize_default_constraint(
941 set_default.default_constraint.as_slice(),
942 &meta.column_schema.name,
943 &meta.column_schema.data_type,
944 )
945 .context(SqlCommonSnafu)?;
946
947 meta.column_schema = meta
948 .column_schema
949 .clone()
950 .with_default_constraint(default_constraint)
951 .with_context(|_| CastDefaultValueSnafu {
952 reason: format!("Failed to set default : {set_default:?}"),
953 })?;
954 } else {
955 return InvalidRegionRequestSnafu {
956 region_id: self.region_id,
957 err: format!("column {} not found", set_default.name),
958 }
959 .fail();
960 }
961 }
962 Ok(())
963 }
964}
965
966struct SkippedFields {
968 schema: SchemaRef,
970 time_index: ColumnId,
972 id_to_index: HashMap<ColumnId, usize>,
974}
975
976impl SkippedFields {
977 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
979 let column_schemas = column_metadatas
980 .iter()
981 .map(|column_metadata| column_metadata.column_schema.clone())
982 .collect();
983 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
984 let time_index = column_metadatas
985 .iter()
986 .find_map(|col| {
987 if col.semantic_type == SemanticType::Timestamp {
988 Some(col.column_id)
989 } else {
990 None
991 }
992 })
993 .context(InvalidMetaSnafu {
994 reason: "time index not found",
995 })?;
996 let id_to_index = column_metadatas
997 .iter()
998 .enumerate()
999 .map(|(idx, col)| (col.column_id, idx))
1000 .collect();
1001
1002 Ok(SkippedFields {
1003 schema,
1004 time_index,
1005 id_to_index,
1006 })
1007 }
1008}
1009
1010#[derive(Snafu)]
1011#[snafu(visibility(pub))]
1012#[stack_trace_debug]
1013pub enum MetadataError {
1014 #[snafu(display("Invalid schema"))]
1015 InvalidSchema {
1016 source: datatypes::error::Error,
1017 #[snafu(implicit)]
1018 location: Location,
1019 },
1020
1021 #[snafu(display("Invalid metadata, {}", reason))]
1022 InvalidMeta {
1023 reason: String,
1024 #[snafu(implicit)]
1025 location: Location,
1026 },
1027
1028 #[snafu(display("Failed to ser/de json object"))]
1029 SerdeJson {
1030 #[snafu(implicit)]
1031 location: Location,
1032 #[snafu(source)]
1033 error: serde_json::Error,
1034 },
1035
1036 #[snafu(display("Invalid raw region request, err: {}", err))]
1037 InvalidRawRegionRequest {
1038 err: String,
1039 #[snafu(implicit)]
1040 location: Location,
1041 },
1042
1043 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1044 InvalidRegionRequest {
1045 region_id: RegionId,
1046 err: String,
1047 #[snafu(implicit)]
1048 location: Location,
1049 },
1050
1051 #[snafu(display("Unexpected schema error during project"))]
1052 SchemaProject {
1053 origin_schema: SchemaRef,
1054 projection: Vec<ColumnId>,
1055 #[snafu(implicit)]
1056 location: Location,
1057 source: datatypes::Error,
1058 },
1059
1060 #[snafu(display("Time index column not found"))]
1061 TimeIndexNotFound {
1062 #[snafu(implicit)]
1063 location: Location,
1064 },
1065
1066 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1067 ChangeColumnNotFound {
1068 column_name: String,
1069 region_id: RegionId,
1070 #[snafu(implicit)]
1071 location: Location,
1072 },
1073
1074 #[snafu(display("Failed to convert column schema"))]
1075 ConvertColumnSchema {
1076 source: api::error::Error,
1077 #[snafu(implicit)]
1078 location: Location,
1079 },
1080
1081 #[snafu(display("Failed to convert TimeRanges"))]
1082 ConvertTimeRanges {
1083 source: api::error::Error,
1084 #[snafu(implicit)]
1085 location: Location,
1086 },
1087
1088 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1089 InvalidSetRegionOptionRequest {
1090 key: String,
1091 value: String,
1092 #[snafu(implicit)]
1093 location: Location,
1094 },
1095
1096 #[snafu(display("Invalid set region option request, key: {}", key))]
1097 InvalidUnsetRegionOptionRequest {
1098 key: String,
1099 #[snafu(implicit)]
1100 location: Location,
1101 },
1102
1103 #[snafu(display("Failed to decode protobuf"))]
1104 DecodeProto {
1105 #[snafu(source)]
1106 error: prost::UnknownEnumValue,
1107 #[snafu(implicit)]
1108 location: Location,
1109 },
1110
1111 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1112 InvalidColumnOption {
1113 column_name: String,
1114 msg: String,
1115 #[snafu(implicit)]
1116 location: Location,
1117 },
1118
1119 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1120 SetFulltextOptions {
1121 column_name: String,
1122 source: datatypes::Error,
1123 #[snafu(implicit)]
1124 location: Location,
1125 },
1126
1127 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1128 GetFulltextOptions {
1129 column_name: String,
1130 source: datatypes::Error,
1131 #[snafu(implicit)]
1132 location: Location,
1133 },
1134
1135 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1136 SetSkippingIndexOptions {
1137 column_name: String,
1138 source: datatypes::Error,
1139 #[snafu(implicit)]
1140 location: Location,
1141 },
1142
1143 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1144 UnsetSkippingIndexOptions {
1145 column_name: String,
1146 source: datatypes::Error,
1147 #[snafu(implicit)]
1148 location: Location,
1149 },
1150
1151 #[snafu(display("Failed to decode arrow ipc record batches"))]
1152 DecodeArrowIpc {
1153 #[snafu(source)]
1154 error: arrow::error::ArrowError,
1155 #[snafu(implicit)]
1156 location: Location,
1157 },
1158
1159 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1160 CastDefaultValue {
1161 reason: String,
1162 source: datatypes::Error,
1163 #[snafu(implicit)]
1164 location: Location,
1165 },
1166
1167 #[snafu(display("Unexpected: {}", reason))]
1168 Unexpected {
1169 reason: String,
1170 #[snafu(implicit)]
1171 location: Location,
1172 },
1173
1174 #[snafu(display("Failed to encode/decode flight message"))]
1175 FlightCodec {
1176 source: common_grpc::Error,
1177 #[snafu(implicit)]
1178 location: Location,
1179 },
1180
1181 #[snafu(display("Invalid index option"))]
1182 InvalidIndexOption {
1183 #[snafu(implicit)]
1184 location: Location,
1185 #[snafu(source)]
1186 error: datatypes::error::Error,
1187 },
1188
1189 #[snafu(display("Sql common error"))]
1190 SqlCommon {
1191 source: common_sql::error::Error,
1192 #[snafu(implicit)]
1193 location: Location,
1194 },
1195}
1196
1197impl ErrorExt for MetadataError {
1198 fn status_code(&self) -> StatusCode {
1199 match self {
1200 Self::SqlCommon { source, .. } => source.status_code(),
1201 _ => StatusCode::InvalidArguments,
1202 }
1203 }
1204
1205 fn as_any(&self) -> &dyn Any {
1206 self
1207 }
1208}
1209
1210fn set_column_fulltext_options(
1219 column_meta: &mut ColumnMetadata,
1220 column_name: &str,
1221 options: &FulltextOptions,
1222 current_options: Option<FulltextOptions>,
1223) -> Result<()> {
1224 if let Some(current_options) = current_options {
1225 ensure!(
1226 current_options.analyzer == options.analyzer
1227 && current_options.case_sensitive == options.case_sensitive,
1228 InvalidColumnOptionSnafu {
1229 column_name,
1230 msg: format!(
1231 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1232 current_options.analyzer, current_options.case_sensitive
1233 ),
1234 }
1235 );
1236 }
1237
1238 column_meta
1239 .column_schema
1240 .set_fulltext_options(options)
1241 .context(SetFulltextOptionsSnafu { column_name })?;
1242
1243 Ok(())
1244}
1245
1246fn unset_column_fulltext_options(
1247 column_meta: &mut ColumnMetadata,
1248 column_name: &str,
1249 current_options: Option<FulltextOptions>,
1250) -> Result<()> {
1251 if let Some(mut current_options) = current_options
1252 && current_options.enable
1253 {
1254 current_options.enable = false;
1255 column_meta
1256 .column_schema
1257 .set_fulltext_options(¤t_options)
1258 .context(SetFulltextOptionsSnafu { column_name })?;
1259 } else {
1260 return InvalidColumnOptionSnafu {
1261 column_name,
1262 msg: "FULLTEXT index already disabled",
1263 }
1264 .fail();
1265 }
1266
1267 Ok(())
1268}
1269
1270#[cfg(test)]
1271mod test {
1272 use datatypes::prelude::ConcreteDataType;
1273 use datatypes::schema::{
1274 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1275 };
1276 use datatypes::value::Value;
1277
1278 use super::*;
1279
1280 fn create_builder() -> RegionMetadataBuilder {
1281 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1282 }
1283
1284 fn build_test_region_metadata() -> RegionMetadata {
1285 let mut builder = create_builder();
1286 builder
1287 .push_column_metadata(ColumnMetadata {
1288 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1289 semantic_type: SemanticType::Tag,
1290 column_id: 1,
1291 })
1292 .push_column_metadata(ColumnMetadata {
1293 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1294 semantic_type: SemanticType::Field,
1295 column_id: 2,
1296 })
1297 .push_column_metadata(ColumnMetadata {
1298 column_schema: ColumnSchema::new(
1299 "c",
1300 ConcreteDataType::timestamp_millisecond_datatype(),
1301 false,
1302 ),
1303 semantic_type: SemanticType::Timestamp,
1304 column_id: 3,
1305 })
1306 .primary_key(vec![1])
1307 .partition_expr_json(Some("".to_string()));
1308 builder.build().unwrap()
1309 }
1310
1311 #[test]
1312 fn test_region_metadata() {
1313 let region_metadata = build_test_region_metadata();
1314 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1315 assert_eq!(
1316 "a",
1317 region_metadata.column_by_id(1).unwrap().column_schema.name
1318 );
1319 assert_eq!(None, region_metadata.column_by_id(10));
1320 }
1321
1322 #[test]
1323 fn test_region_metadata_serde() {
1324 let region_metadata = build_test_region_metadata();
1325 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1326 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1327 assert_eq!(region_metadata, deserialized);
1328 }
1329
1330 #[test]
1331 fn test_column_metadata_validate() {
1332 let mut builder = create_builder();
1333 let col = ColumnMetadata {
1334 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1335 semantic_type: SemanticType::Timestamp,
1336 column_id: 1,
1337 };
1338
1339 builder.push_column_metadata(col);
1340 let err = builder.build().unwrap_err();
1341 assert!(
1342 err.to_string()
1343 .contains("column `ts` is not timestamp type"),
1344 "unexpected err: {err}",
1345 );
1346 }
1347
1348 #[test]
1349 fn test_empty_region_metadata() {
1350 let builder = create_builder();
1351 let err = builder.build().unwrap_err();
1352 assert!(
1354 err.to_string().contains("time index not found"),
1355 "unexpected err: {err}",
1356 );
1357 }
1358
1359 #[test]
1360 fn test_same_column_id() {
1361 let mut builder = create_builder();
1362 builder
1363 .push_column_metadata(ColumnMetadata {
1364 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1365 semantic_type: SemanticType::Tag,
1366 column_id: 1,
1367 })
1368 .push_column_metadata(ColumnMetadata {
1369 column_schema: ColumnSchema::new(
1370 "b",
1371 ConcreteDataType::timestamp_millisecond_datatype(),
1372 false,
1373 ),
1374 semantic_type: SemanticType::Timestamp,
1375 column_id: 1,
1376 });
1377 let err = builder.build().unwrap_err();
1378 assert!(
1379 err.to_string()
1380 .contains("column a and b have the same column id"),
1381 "unexpected err: {err}",
1382 );
1383 }
1384
1385 #[test]
1386 fn test_duplicate_time_index() {
1387 let mut builder = create_builder();
1388 builder
1389 .push_column_metadata(ColumnMetadata {
1390 column_schema: ColumnSchema::new(
1391 "a",
1392 ConcreteDataType::timestamp_millisecond_datatype(),
1393 false,
1394 ),
1395 semantic_type: SemanticType::Timestamp,
1396 column_id: 1,
1397 })
1398 .push_column_metadata(ColumnMetadata {
1399 column_schema: ColumnSchema::new(
1400 "b",
1401 ConcreteDataType::timestamp_millisecond_datatype(),
1402 false,
1403 ),
1404 semantic_type: SemanticType::Timestamp,
1405 column_id: 2,
1406 });
1407 let err = builder.build().unwrap_err();
1408 assert!(
1409 err.to_string().contains("expect only one time index"),
1410 "unexpected err: {err}",
1411 );
1412 }
1413
1414 #[test]
1415 fn test_unknown_primary_key() {
1416 let mut builder = create_builder();
1417 builder
1418 .push_column_metadata(ColumnMetadata {
1419 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1420 semantic_type: SemanticType::Tag,
1421 column_id: 1,
1422 })
1423 .push_column_metadata(ColumnMetadata {
1424 column_schema: ColumnSchema::new(
1425 "b",
1426 ConcreteDataType::timestamp_millisecond_datatype(),
1427 false,
1428 ),
1429 semantic_type: SemanticType::Timestamp,
1430 column_id: 2,
1431 })
1432 .primary_key(vec![3]);
1433 let err = builder.build().unwrap_err();
1434 assert!(
1435 err.to_string().contains("unknown column id 3"),
1436 "unexpected err: {err}",
1437 );
1438 }
1439
1440 #[test]
1441 fn test_same_primary_key() {
1442 let mut builder = create_builder();
1443 builder
1444 .push_column_metadata(ColumnMetadata {
1445 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1446 semantic_type: SemanticType::Tag,
1447 column_id: 1,
1448 })
1449 .push_column_metadata(ColumnMetadata {
1450 column_schema: ColumnSchema::new(
1451 "b",
1452 ConcreteDataType::timestamp_millisecond_datatype(),
1453 false,
1454 ),
1455 semantic_type: SemanticType::Timestamp,
1456 column_id: 2,
1457 })
1458 .primary_key(vec![1, 1]);
1459 let err = builder.build().unwrap_err();
1460 assert!(
1461 err.to_string()
1462 .contains("duplicate column a in primary key"),
1463 "unexpected err: {err}",
1464 );
1465 }
1466
1467 #[test]
1468 fn test_in_time_index() {
1469 let mut builder = create_builder();
1470 builder
1471 .push_column_metadata(ColumnMetadata {
1472 column_schema: ColumnSchema::new(
1473 "ts",
1474 ConcreteDataType::timestamp_millisecond_datatype(),
1475 false,
1476 ),
1477 semantic_type: SemanticType::Timestamp,
1478 column_id: 1,
1479 })
1480 .primary_key(vec![1]);
1481 let err = builder.build().unwrap_err();
1482 assert!(
1483 err.to_string()
1484 .contains("column ts is already a time index column"),
1485 "unexpected err: {err}",
1486 );
1487 }
1488
1489 #[test]
1490 fn test_nullable_time_index() {
1491 let mut builder = create_builder();
1492 builder.push_column_metadata(ColumnMetadata {
1493 column_schema: ColumnSchema::new(
1494 "ts",
1495 ConcreteDataType::timestamp_millisecond_datatype(),
1496 true,
1497 ),
1498 semantic_type: SemanticType::Timestamp,
1499 column_id: 1,
1500 });
1501 let err = builder.build().unwrap_err();
1502 assert!(
1503 err.to_string()
1504 .contains("time index column ts must be NOT NULL"),
1505 "unexpected err: {err}",
1506 );
1507 }
1508
1509 #[test]
1510 fn test_primary_key_semantic_type() {
1511 let mut builder = create_builder();
1512 builder
1513 .push_column_metadata(ColumnMetadata {
1514 column_schema: ColumnSchema::new(
1515 "ts",
1516 ConcreteDataType::timestamp_millisecond_datatype(),
1517 false,
1518 ),
1519 semantic_type: SemanticType::Timestamp,
1520 column_id: 1,
1521 })
1522 .push_column_metadata(ColumnMetadata {
1523 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1524 semantic_type: SemanticType::Field,
1525 column_id: 2,
1526 })
1527 .primary_key(vec![2]);
1528 let err = builder.build().unwrap_err();
1529 assert!(
1530 err.to_string()
1531 .contains("semantic type of column a should be Tag, not Field"),
1532 "unexpected err: {err}",
1533 );
1534 }
1535
1536 #[test]
1537 fn test_primary_key_tag_num() {
1538 let mut builder = create_builder();
1539 builder
1540 .push_column_metadata(ColumnMetadata {
1541 column_schema: ColumnSchema::new(
1542 "ts",
1543 ConcreteDataType::timestamp_millisecond_datatype(),
1544 false,
1545 ),
1546 semantic_type: SemanticType::Timestamp,
1547 column_id: 1,
1548 })
1549 .push_column_metadata(ColumnMetadata {
1550 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1551 semantic_type: SemanticType::Tag,
1552 column_id: 2,
1553 })
1554 .push_column_metadata(ColumnMetadata {
1555 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1556 semantic_type: SemanticType::Tag,
1557 column_id: 3,
1558 })
1559 .primary_key(vec![2]);
1560 let err = builder.build().unwrap_err();
1561 assert!(
1562 err.to_string()
1563 .contains("number of primary key columns 1 not equal to tag columns 2"),
1564 "unexpected err: {err}",
1565 );
1566 }
1567
1568 #[test]
1569 fn test_bump_version() {
1570 let mut region_metadata = build_test_region_metadata();
1571 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1572 builder.bump_version();
1573 let new_meta = builder.build().unwrap();
1574 region_metadata.schema_version += 1;
1575 assert_eq!(region_metadata, new_meta);
1576 }
1577
1578 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1579 let semantic_type = if is_tag {
1580 SemanticType::Tag
1581 } else {
1582 SemanticType::Field
1583 };
1584 ColumnMetadata {
1585 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1586 semantic_type,
1587 column_id,
1588 }
1589 }
1590
1591 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1592 let actual: Vec<_> = metadata
1593 .column_metadatas
1594 .iter()
1595 .map(|col| &col.column_schema.name)
1596 .collect();
1597 assert_eq!(names, actual);
1598 }
1599
1600 fn get_columns_default_constraint(
1601 metadata: &RegionMetadata,
1602 name: String,
1603 ) -> Option<Option<&ColumnDefaultConstraint>> {
1604 metadata.column_metadatas.iter().find_map(|col| {
1605 if col.column_schema.name == name {
1606 Some(col.column_schema.default_constraint())
1607 } else {
1608 None
1609 }
1610 })
1611 }
1612
1613 #[test]
1614 fn test_alter() {
1615 let metadata = build_test_region_metadata();
1617 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1618 builder
1620 .alter(AlterKind::AddColumns {
1621 columns: vec![AddColumn {
1622 column_metadata: new_column_metadata("d", true, 4),
1623 location: None,
1624 }],
1625 })
1626 .unwrap();
1627 let metadata = builder.build().unwrap();
1628 check_columns(&metadata, &["a", "b", "c", "d"]);
1629 assert_eq!([1, 4], &metadata.primary_key[..]);
1630
1631 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1632 builder
1633 .alter(AlterKind::AddColumns {
1634 columns: vec![AddColumn {
1635 column_metadata: new_column_metadata("e", false, 5),
1636 location: Some(AddColumnLocation::First),
1637 }],
1638 })
1639 .unwrap();
1640 let metadata = builder.build().unwrap();
1641 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1642
1643 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1644 builder
1645 .alter(AlterKind::AddColumns {
1646 columns: vec![AddColumn {
1647 column_metadata: new_column_metadata("f", false, 6),
1648 location: Some(AddColumnLocation::After {
1649 column_name: "b".to_string(),
1650 }),
1651 }],
1652 })
1653 .unwrap();
1654 let metadata = builder.build().unwrap();
1655 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1656
1657 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1658 builder
1659 .alter(AlterKind::AddColumns {
1660 columns: vec![AddColumn {
1661 column_metadata: new_column_metadata("g", false, 7),
1662 location: Some(AddColumnLocation::After {
1663 column_name: "d".to_string(),
1664 }),
1665 }],
1666 })
1667 .unwrap();
1668 let metadata = builder.build().unwrap();
1669 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1670
1671 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1672 builder
1673 .alter(AlterKind::DropColumns {
1674 names: vec!["g".to_string(), "e".to_string()],
1675 })
1676 .unwrap();
1677 let metadata = builder.build().unwrap();
1678 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1679
1680 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1681 builder
1682 .alter(AlterKind::DropColumns {
1683 names: vec!["a".to_string()],
1684 })
1685 .unwrap();
1686 let err = builder.build().unwrap_err();
1688 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1689
1690 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1691 let mut column_metadata = new_column_metadata("g", false, 8);
1692 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1693 column_metadata.column_schema = column_metadata
1694 .column_schema
1695 .with_default_constraint(default_constraint.clone())
1696 .unwrap();
1697 builder
1698 .alter(AlterKind::AddColumns {
1699 columns: vec![AddColumn {
1700 column_metadata,
1701 location: None,
1702 }],
1703 })
1704 .unwrap();
1705 let metadata = builder.build().unwrap();
1706 assert_eq!(
1707 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1708 default_constraint.as_ref()
1709 );
1710 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1711
1712 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1713 builder
1714 .alter(AlterKind::DropDefaults {
1715 names: vec!["g".to_string()],
1716 })
1717 .unwrap();
1718 let metadata = builder.build().unwrap();
1719 assert_eq!(
1720 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1721 None
1722 );
1723 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1724
1725 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1726 builder
1727 .alter(AlterKind::DropColumns {
1728 names: vec!["g".to_string()],
1729 })
1730 .unwrap();
1731 let metadata = builder.build().unwrap();
1732 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1733
1734 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1735 builder
1736 .alter(AlterKind::ModifyColumnTypes {
1737 columns: vec![ModifyColumnType {
1738 column_name: "b".to_string(),
1739 target_type: ConcreteDataType::string_datatype(),
1740 }],
1741 })
1742 .unwrap();
1743 let metadata = builder.build().unwrap();
1744 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1745 let b_type = &metadata
1746 .column_by_name("b")
1747 .unwrap()
1748 .column_schema
1749 .data_type;
1750 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1751
1752 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1753 builder
1754 .alter(AlterKind::SetIndexes {
1755 options: vec![SetIndexOption::Fulltext {
1756 column_name: "b".to_string(),
1757 options: FulltextOptions::new_unchecked(
1758 true,
1759 FulltextAnalyzer::Chinese,
1760 true,
1761 FulltextBackend::Bloom,
1762 1000,
1763 0.01,
1764 ),
1765 }],
1766 })
1767 .unwrap();
1768 let metadata = builder.build().unwrap();
1769 let a_fulltext_options = metadata
1770 .column_by_name("b")
1771 .unwrap()
1772 .column_schema
1773 .fulltext_options()
1774 .unwrap()
1775 .unwrap();
1776 assert!(a_fulltext_options.enable);
1777 assert_eq!(
1778 datatypes::schema::FulltextAnalyzer::Chinese,
1779 a_fulltext_options.analyzer
1780 );
1781 assert!(a_fulltext_options.case_sensitive);
1782
1783 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1784 builder
1785 .alter(AlterKind::UnsetIndexes {
1786 options: vec![UnsetIndexOption::Fulltext {
1787 column_name: "b".to_string(),
1788 }],
1789 })
1790 .unwrap();
1791 let metadata = builder.build().unwrap();
1792 let a_fulltext_options = metadata
1793 .column_by_name("b")
1794 .unwrap()
1795 .column_schema
1796 .fulltext_options()
1797 .unwrap()
1798 .unwrap();
1799 assert!(!a_fulltext_options.enable);
1800 assert_eq!(
1801 datatypes::schema::FulltextAnalyzer::Chinese,
1802 a_fulltext_options.analyzer
1803 );
1804 assert!(a_fulltext_options.case_sensitive);
1805 }
1806
1807 #[test]
1808 fn test_add_if_not_exists() {
1809 let metadata = build_test_region_metadata();
1811 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1812 builder
1814 .alter(AlterKind::AddColumns {
1815 columns: vec![
1816 AddColumn {
1817 column_metadata: new_column_metadata("d", true, 4),
1818 location: None,
1819 },
1820 AddColumn {
1821 column_metadata: new_column_metadata("d", true, 4),
1822 location: None,
1823 },
1824 ],
1825 })
1826 .unwrap();
1827 let metadata = builder.build().unwrap();
1828 check_columns(&metadata, &["a", "b", "c", "d"]);
1829 assert_eq!([1, 4], &metadata.primary_key[..]);
1830
1831 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1832 builder
1834 .alter(AlterKind::AddColumns {
1835 columns: vec![AddColumn {
1836 column_metadata: new_column_metadata("b", false, 2),
1837 location: None,
1838 }],
1839 })
1840 .unwrap();
1841 let metadata = builder.build().unwrap();
1842 check_columns(&metadata, &["a", "b", "c", "d"]);
1843 }
1844
1845 #[test]
1846 fn test_add_column_with_inverted_index() {
1847 let metadata = build_test_region_metadata();
1851 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1852 let mut col = new_column_metadata("d", true, 4);
1854 col.column_schema.set_inverted_index(true);
1855 builder
1856 .alter(AlterKind::AddColumns {
1857 columns: vec![
1858 AddColumn {
1859 column_metadata: col,
1860 location: None,
1861 },
1862 AddColumn {
1863 column_metadata: new_column_metadata("e", true, 5),
1864 location: None,
1865 },
1866 ],
1867 })
1868 .unwrap();
1869 let metadata = builder.build().unwrap();
1870 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1871 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1872 let column_metadata = metadata.column_by_name("a").unwrap();
1873 assert!(!column_metadata.column_schema.is_inverted_indexed());
1874 let column_metadata = metadata.column_by_name("b").unwrap();
1875 assert!(!column_metadata.column_schema.is_inverted_indexed());
1876 let column_metadata = metadata.column_by_name("c").unwrap();
1877 assert!(!column_metadata.column_schema.is_inverted_indexed());
1878 let column_metadata = metadata.column_by_name("d").unwrap();
1879 assert!(column_metadata.column_schema.is_inverted_indexed());
1880 let column_metadata = metadata.column_by_name("e").unwrap();
1881 assert!(!column_metadata.column_schema.is_inverted_indexed());
1882 }
1883
1884 #[test]
1885 fn test_drop_if_exists() {
1886 let metadata = build_test_region_metadata();
1888 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1889 builder
1891 .alter(AlterKind::AddColumns {
1892 columns: vec![
1893 AddColumn {
1894 column_metadata: new_column_metadata("d", false, 4),
1895 location: None,
1896 },
1897 AddColumn {
1898 column_metadata: new_column_metadata("e", false, 5),
1899 location: None,
1900 },
1901 ],
1902 })
1903 .unwrap();
1904 let metadata = builder.build().unwrap();
1905 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1906
1907 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1908 builder
1909 .alter(AlterKind::DropColumns {
1910 names: vec!["b".to_string(), "b".to_string()],
1911 })
1912 .unwrap();
1913 let metadata = builder.build().unwrap();
1914 check_columns(&metadata, &["a", "c", "d", "e"]);
1915
1916 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1917 builder
1918 .alter(AlterKind::DropColumns {
1919 names: vec!["b".to_string(), "e".to_string()],
1920 })
1921 .unwrap();
1922 let metadata = builder.build().unwrap();
1923 check_columns(&metadata, &["a", "c", "d"]);
1924 }
1925
1926 #[test]
1927 fn test_invalid_column_name() {
1928 let mut builder = create_builder();
1929 builder.push_column_metadata(ColumnMetadata {
1930 column_schema: ColumnSchema::new(
1931 "__sequence",
1932 ConcreteDataType::timestamp_millisecond_datatype(),
1933 false,
1934 ),
1935 semantic_type: SemanticType::Timestamp,
1936 column_id: 1,
1937 });
1938 let err = builder.build().unwrap_err();
1939 assert!(
1940 err.to_string()
1941 .contains("internal column name that can not be used"),
1942 "unexpected err: {err}",
1943 );
1944 }
1945
1946 #[test]
1947 fn test_allow_internal_column_name() {
1948 let mut builder = create_builder();
1949 builder
1950 .push_column_metadata(ColumnMetadata {
1951 column_schema: ColumnSchema::new(
1952 "__primary_key",
1953 ConcreteDataType::string_datatype(),
1954 false,
1955 ),
1956 semantic_type: SemanticType::Tag,
1957 column_id: 1,
1958 })
1959 .push_column_metadata(ColumnMetadata {
1960 column_schema: ColumnSchema::new(
1961 "ts",
1962 ConcreteDataType::timestamp_millisecond_datatype(),
1963 false,
1964 ),
1965 semantic_type: SemanticType::Timestamp,
1966 column_id: 2,
1967 })
1968 .primary_key(vec![1]);
1969
1970 let metadata = builder.build_without_validation().unwrap();
1971 assert_eq!(
1972 "__primary_key",
1973 metadata.column_metadatas[0].column_schema.name
1974 );
1975 }
1976
1977 #[test]
1978 fn test_build_without_validation() {
1979 let mut builder = create_builder();
1981 builder
1982 .push_column_metadata(ColumnMetadata {
1983 column_schema: ColumnSchema::new(
1984 "ts",
1985 ConcreteDataType::timestamp_millisecond_datatype(),
1986 false,
1987 ),
1988 semantic_type: SemanticType::Timestamp,
1989 column_id: 1,
1990 })
1991 .push_column_metadata(ColumnMetadata {
1992 column_schema: ColumnSchema::new(
1993 "field",
1994 ConcreteDataType::string_datatype(),
1995 true,
1996 ),
1997 semantic_type: SemanticType::Field,
1998 column_id: 2,
1999 })
2000 .primary_key(vec![2]);
2001
2002 let metadata = builder.build_without_validation().unwrap();
2004 assert_eq!(vec![2], metadata.primary_key);
2005
2006 let mut builder = create_builder();
2008 builder
2009 .push_column_metadata(ColumnMetadata {
2010 column_schema: ColumnSchema::new(
2011 "ts",
2012 ConcreteDataType::timestamp_millisecond_datatype(),
2013 false,
2014 ),
2015 semantic_type: SemanticType::Timestamp,
2016 column_id: 1,
2017 })
2018 .push_column_metadata(ColumnMetadata {
2019 column_schema: ColumnSchema::new(
2020 "field",
2021 ConcreteDataType::string_datatype(),
2022 true,
2023 ),
2024 semantic_type: SemanticType::Field,
2025 column_id: 2,
2026 })
2027 .primary_key(vec![2]);
2028 let err = builder.build().unwrap_err();
2029 assert!(
2030 err.to_string()
2031 .contains("semantic type of column field should be Tag"),
2032 "unexpected err: {err}"
2033 );
2034 }
2035
2036 #[test]
2037 fn test_debug_for_column_metadata() {
2038 let region_metadata = build_test_region_metadata();
2039 let formatted = format!("{:?}", region_metadata);
2040 assert_eq!(
2041 formatted,
2042 "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2043 );
2044 }
2045
2046 #[test]
2047 fn test_region_metadata_deserialize_default_primary_key_encoding() {
2048 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2049 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2050 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2051
2052 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2053 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2054 assert_eq!(
2055 deserialized.primary_key_encoding,
2056 PrimaryKeyEncoding::Sparse
2057 );
2058 }
2059}