1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::column_def::try_as_column_schema;
25use api::v1::region::RegionColumnDef;
26use api::v1::SemanticType;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef};
33use datatypes::types::TimestampType;
34use serde::de::Error;
35use serde::{Deserialize, Deserializer, Serialize};
36use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
37
38use crate::codec::PrimaryKeyEncoding;
39use crate::region_request::{
40 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
41};
42use crate::storage::consts::is_internal_column;
43use crate::storage::{ColumnId, RegionId};
44
45pub type Result<T> = std::result::Result<T, MetadataError>;
46
47#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
49pub struct ColumnMetadata {
50 pub column_schema: ColumnSchema,
52 pub semantic_type: SemanticType,
54 pub column_id: ColumnId,
56}
57
58impl fmt::Debug for ColumnMetadata {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 write!(
61 f,
62 "[{:?} {:?} {:?}]",
63 self.column_schema, self.semantic_type, self.column_id,
64 )
65 }
66}
67
68impl ColumnMetadata {
69 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
71 let column_id = column_def.column_id;
72 let column_def = column_def
73 .column_def
74 .context(InvalidRawRegionRequestSnafu {
75 err: "column_def is absent",
76 })?;
77 let semantic_type = column_def.semantic_type();
78 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
79
80 Ok(Self {
81 column_schema,
82 semantic_type,
83 column_id,
84 })
85 }
86
87 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
89 serde_json::to_vec(columns)
90 }
91
92 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
94 serde_json::from_slice(bytes)
95 }
96
97 pub fn is_same_datatype(&self, other: &Self) -> bool {
98 self.column_schema.data_type == other.column_schema.data_type
99 }
100}
101
102#[cfg_attr(doc, aquamarine::aquamarine)]
103#[derive(Clone, PartialEq, Eq, Serialize)]
127pub struct RegionMetadata {
128 #[serde(skip)]
130 pub schema: SchemaRef,
131
132 #[serde(skip)]
136 time_index: ColumnId,
137 #[serde(skip)]
139 id_to_index: HashMap<ColumnId, usize>,
140
141 pub column_metadatas: Vec<ColumnMetadata>,
144 pub primary_key: Vec<ColumnId>,
146
147 pub region_id: RegionId,
149 pub schema_version: u64,
153
154 pub primary_key_encoding: PrimaryKeyEncoding,
156}
157
158impl fmt::Debug for RegionMetadata {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 f.debug_struct("RegionMetadata")
161 .field("column_metadatas", &self.column_metadatas)
162 .field("time_index", &self.time_index)
163 .field("primary_key", &self.primary_key)
164 .field("region_id", &self.region_id)
165 .field("schema_version", &self.schema_version)
166 .finish()
167 }
168}
169
170pub type RegionMetadataRef = Arc<RegionMetadata>;
171
172impl<'de> Deserialize<'de> for RegionMetadata {
173 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
174 where
175 D: Deserializer<'de>,
176 {
177 #[derive(Deserialize)]
179 struct RegionMetadataWithoutSchema {
180 column_metadatas: Vec<ColumnMetadata>,
181 primary_key: Vec<ColumnId>,
182 region_id: RegionId,
183 schema_version: u64,
184 #[serde(default)]
185 primary_key_encoding: PrimaryKeyEncoding,
186 }
187
188 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
189 let skipped =
190 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
191
192 Ok(Self {
193 schema: skipped.schema,
194 time_index: skipped.time_index,
195 id_to_index: skipped.id_to_index,
196 column_metadatas: without_schema.column_metadatas,
197 primary_key: without_schema.primary_key,
198 region_id: without_schema.region_id,
199 schema_version: without_schema.schema_version,
200 primary_key_encoding: without_schema.primary_key_encoding,
201 })
202 }
203}
204
205impl RegionMetadata {
206 pub fn from_json(s: &str) -> Result<Self> {
208 serde_json::from_str(s).context(SerdeJsonSnafu)
209 }
210
211 pub fn to_json(&self) -> Result<String> {
213 serde_json::to_string(&self).context(SerdeJsonSnafu)
214 }
215
216 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
218 self.id_to_index
219 .get(&column_id)
220 .map(|index| &self.column_metadatas[*index])
221 }
222
223 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
225 self.id_to_index.get(&column_id).copied()
226 }
227
228 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
230 self.column_metadatas
231 .iter()
232 .position(|col| col.column_schema.name == column_name)
233 }
234
235 pub fn time_index_column(&self) -> &ColumnMetadata {
240 let index = self.id_to_index[&self.time_index];
241 &self.column_metadatas[index]
242 }
243
244 pub fn time_index_type(&self) -> TimestampType {
249 let index = self.id_to_index[&self.time_index];
250 self.column_metadatas[index]
251 .column_schema
252 .data_type
253 .as_timestamp()
254 .unwrap()
255 }
256
257 pub fn time_index_column_pos(&self) -> usize {
259 self.id_to_index[&self.time_index]
260 }
261
262 pub fn time_index_field(&self) -> FieldRef {
264 let index = self.id_to_index[&self.time_index];
265 self.schema.arrow_schema().fields[index].clone()
266 }
267
268 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
270 self.schema
271 .column_index_by_name(name)
272 .map(|index| &self.column_metadatas[index])
273 }
274
275 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
277 self.primary_key
279 .iter()
280 .map(|id| self.column_by_id(*id).unwrap())
281 }
282
283 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288 self.column_metadatas
289 .iter()
290 .filter(|column| column.semantic_type == SemanticType::Field)
291 }
292
293 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
297 self.primary_key.iter().position(|id| *id == column_id)
298 }
299
300 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
304 ensure!(
306 projection.contains(&self.time_index),
307 TimeIndexNotFoundSnafu
308 );
309
310 let indices_to_preserve = projection
312 .iter()
313 .map(|id| {
314 self.column_index_by_id(*id)
315 .with_context(|| InvalidRegionRequestSnafu {
316 region_id: self.region_id,
317 err: format!("column id {} not found", id),
318 })
319 })
320 .collect::<Result<Vec<_>>>()?;
321
322 let projected_schema =
324 self.schema
325 .try_project(&indices_to_preserve)
326 .with_context(|_| SchemaProjectSnafu {
327 origin_schema: self.schema.clone(),
328 projection: projection.to_vec(),
329 })?;
330
331 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
333 let mut projected_primary_key = vec![];
334 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
335 for index in indices_to_preserve {
336 let col = self.column_metadatas[index].clone();
337 if col.semantic_type == SemanticType::Tag {
338 projected_primary_key.push(col.column_id);
339 }
340 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
341 projected_column_metadatas.push(col);
342 }
343
344 Ok(RegionMetadata {
345 schema: Arc::new(projected_schema),
346 time_index: self.time_index,
347 id_to_index: projected_id_to_index,
348 column_metadatas: projected_column_metadatas,
349 primary_key: projected_primary_key,
350 region_id: self.region_id,
351 schema_version: self.schema_version,
352 primary_key_encoding: self.primary_key_encoding,
353 })
354 }
355
356 pub fn inverted_indexed_column_ids<'a>(
358 &self,
359 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
360 ) -> HashSet<ColumnId> {
361 let mut inverted_index = self
362 .column_metadatas
363 .iter()
364 .filter(|column| column.column_schema.is_inverted_indexed())
365 .map(|column| column.column_id)
366 .collect::<HashSet<_>>();
367
368 for ignored in ignore_column_ids {
369 inverted_index.remove(ignored);
370 }
371
372 inverted_index
373 }
374
375 fn validate(&self) -> Result<()> {
377 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
379 for col in &self.column_metadatas {
380 Self::validate_column_metadata(col)?;
382
383 ensure!(
386 !id_names.contains_key(&col.column_id),
387 InvalidMetaSnafu {
388 reason: format!(
389 "column {} and {} have the same column id {}",
390 id_names[&col.column_id], col.column_schema.name, col.column_id,
391 ),
392 }
393 );
394 id_names.insert(col.column_id, &col.column_schema.name);
395 }
396
397 let num_time_index = self
399 .column_metadatas
400 .iter()
401 .filter(|col| col.semantic_type == SemanticType::Timestamp)
402 .count();
403 ensure!(
404 num_time_index == 1,
405 InvalidMetaSnafu {
406 reason: format!("expect only one time index, found {}", num_time_index),
407 }
408 );
409
410 ensure!(
412 !self.time_index_column().column_schema.is_nullable(),
413 InvalidMetaSnafu {
414 reason: format!(
415 "time index column {} must be NOT NULL",
416 self.time_index_column().column_schema.name
417 ),
418 }
419 );
420
421 if !self.primary_key.is_empty() {
422 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
423 for column_id in &self.primary_key {
425 ensure!(
427 id_names.contains_key(column_id),
428 InvalidMetaSnafu {
429 reason: format!("unknown column id {}", column_id),
430 }
431 );
432
433 let column = self.column_by_id(*column_id).unwrap();
435 ensure!(
437 !pk_ids.contains(&column_id),
438 InvalidMetaSnafu {
439 reason: format!(
440 "duplicate column {} in primary key",
441 column.column_schema.name
442 ),
443 }
444 );
445
446 ensure!(
448 *column_id != self.time_index,
449 InvalidMetaSnafu {
450 reason: format!(
451 "column {} is already a time index column",
452 column.column_schema.name,
453 ),
454 }
455 );
456
457 ensure!(
459 column.semantic_type == SemanticType::Tag,
460 InvalidMetaSnafu {
461 reason: format!(
462 "semantic type of column {} should be Tag, not {:?}",
463 column.column_schema.name, column.semantic_type
464 ),
465 }
466 );
467
468 pk_ids.insert(column_id);
469 }
470 }
471
472 let num_tag = self
474 .column_metadatas
475 .iter()
476 .filter(|col| col.semantic_type == SemanticType::Tag)
477 .count();
478 ensure!(
479 num_tag == self.primary_key.len(),
480 InvalidMetaSnafu {
481 reason: format!(
482 "number of primary key columns {} not equal to tag columns {}",
483 self.primary_key.len(),
484 num_tag
485 ),
486 }
487 );
488
489 Ok(())
490 }
491
492 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
494 if column_metadata.semantic_type == SemanticType::Timestamp {
495 ensure!(
496 column_metadata.column_schema.data_type.is_timestamp(),
497 InvalidMetaSnafu {
498 reason: format!(
499 "column `{}` is not timestamp type",
500 column_metadata.column_schema.name
501 ),
502 }
503 );
504 }
505
506 ensure!(
507 !is_internal_column(&column_metadata.column_schema.name),
508 InvalidMetaSnafu {
509 reason: format!(
510 "{} is internal column name that can not be used",
511 column_metadata.column_schema.name
512 ),
513 }
514 );
515
516 Ok(())
517 }
518}
519
520pub struct RegionMetadataBuilder {
522 region_id: RegionId,
523 column_metadatas: Vec<ColumnMetadata>,
524 primary_key: Vec<ColumnId>,
525 schema_version: u64,
526 primary_key_encoding: PrimaryKeyEncoding,
527}
528
529impl RegionMetadataBuilder {
530 pub fn new(id: RegionId) -> Self {
532 Self {
533 region_id: id,
534 column_metadatas: vec![],
535 primary_key: vec![],
536 schema_version: 0,
537 primary_key_encoding: PrimaryKeyEncoding::Dense,
538 }
539 }
540
541 pub fn from_existing(existing: RegionMetadata) -> Self {
543 Self {
544 column_metadatas: existing.column_metadatas,
545 primary_key: existing.primary_key,
546 region_id: existing.region_id,
547 schema_version: existing.schema_version,
548 primary_key_encoding: existing.primary_key_encoding,
549 }
550 }
551
552 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
554 self.primary_key_encoding = encoding;
555 self
556 }
557
558 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
560 self.column_metadatas.push(column_metadata);
561 self
562 }
563
564 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
566 self.primary_key = key;
567 self
568 }
569
570 pub fn bump_version(&mut self) -> &mut Self {
572 self.schema_version += 1;
573 self
574 }
575
576 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
580 match kind {
581 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
582 AlterKind::DropColumns { names } => self.drop_columns(&names),
583 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
584 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
585 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
586 AlterKind::SetRegionOptions { options: _ } => {
587 }
589 AlterKind::UnsetRegionOptions { keys: _ } => {
590 }
592 AlterKind::DropDefaults { names } => {
593 self.drop_defaults(names)?;
594 }
595 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
596 AlterKind::SyncColumns { column_metadatas } => {
597 self.primary_key = column_metadatas
598 .iter()
599 .filter_map(|column_metadata| {
600 if column_metadata.semantic_type == SemanticType::Tag {
601 Some(column_metadata.column_id)
602 } else {
603 None
604 }
605 })
606 .collect::<Vec<_>>();
607 self.column_metadatas = column_metadatas;
608 }
609 }
610 Ok(self)
611 }
612
613 pub fn build(self) -> Result<RegionMetadata> {
615 let skipped = SkippedFields::new(&self.column_metadatas)?;
616
617 let meta = RegionMetadata {
618 schema: skipped.schema,
619 time_index: skipped.time_index,
620 id_to_index: skipped.id_to_index,
621 column_metadatas: self.column_metadatas,
622 primary_key: self.primary_key,
623 region_id: self.region_id,
624 schema_version: self.schema_version,
625 primary_key_encoding: self.primary_key_encoding,
626 };
627
628 meta.validate()?;
629
630 Ok(meta)
631 }
632
633 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
635 let mut names: HashSet<_> = self
636 .column_metadatas
637 .iter()
638 .map(|col| col.column_schema.name.clone())
639 .collect();
640
641 for add_column in columns {
642 if names.contains(&add_column.column_metadata.column_schema.name) {
643 continue;
645 }
646
647 let column_id = add_column.column_metadata.column_id;
648 let semantic_type = add_column.column_metadata.semantic_type;
649 let column_name = add_column.column_metadata.column_schema.name.clone();
650 match add_column.location {
651 None => {
652 self.column_metadatas.push(add_column.column_metadata);
653 }
654 Some(AddColumnLocation::First) => {
655 self.column_metadatas.insert(0, add_column.column_metadata);
656 }
657 Some(AddColumnLocation::After { column_name }) => {
658 let pos = self
659 .column_metadatas
660 .iter()
661 .position(|col| col.column_schema.name == column_name)
662 .context(InvalidRegionRequestSnafu {
663 region_id: self.region_id,
664 err: format!(
665 "column {} not found, failed to add column {} after it",
666 column_name, add_column.column_metadata.column_schema.name
667 ),
668 })?;
669 self.column_metadatas
671 .insert(pos + 1, add_column.column_metadata);
672 }
673 }
674 names.insert(column_name);
675 if semantic_type == SemanticType::Tag {
676 self.primary_key.push(column_id);
678 }
679 }
680
681 Ok(())
682 }
683
684 fn drop_columns(&mut self, names: &[String]) {
686 let name_set: HashSet<_> = names.iter().collect();
687 self.column_metadatas
688 .retain(|col| !name_set.contains(&col.column_schema.name));
689 }
690
691 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
693 let mut change_type_map: HashMap<_, _> = columns
694 .into_iter()
695 .map(
696 |ModifyColumnType {
697 column_name,
698 target_type,
699 }| (column_name, target_type),
700 )
701 .collect();
702
703 for column_meta in self.column_metadatas.iter_mut() {
704 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
705 column_meta.column_schema.data_type = target_type.clone();
706 let new_default =
708 if let Some(default_value) = column_meta.column_schema.default_constraint() {
709 Some(
710 default_value
711 .cast_to_datatype(&target_type)
712 .with_context(|_| CastDefaultValueSnafu {
713 reason: format!(
714 "Failed to cast default value from {:?} to type {:?}",
715 default_value, target_type
716 ),
717 })?,
718 )
719 } else {
720 None
721 };
722 column_meta.column_schema = column_meta
723 .column_schema
724 .clone()
725 .with_default_constraint(new_default.clone())
726 .with_context(|_| CastDefaultValueSnafu {
727 reason: format!("Failed to set new default: {:?}", new_default),
728 })?;
729 }
730 }
731
732 Ok(())
733 }
734
735 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
736 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
737 for option in &options {
738 set_index_map
739 .entry(option.column_name())
740 .or_default()
741 .push(option);
742 }
743
744 for column_metadata in self.column_metadatas.iter_mut() {
745 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
746 for option in options {
747 Self::set_index(column_metadata, option)?;
748 }
749 }
750 }
751
752 Ok(())
753 }
754
755 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
756 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
757 for option in &options {
758 unset_index_map
759 .entry(option.column_name())
760 .or_default()
761 .push(option);
762 }
763
764 for column_metadata in self.column_metadatas.iter_mut() {
765 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
766 for option in options {
767 Self::unset_index(column_metadata, option)?;
768 }
769 }
770 }
771
772 Ok(())
773 }
774
775 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
776 match options {
777 SetIndexOption::Fulltext {
778 column_name,
779 options,
780 } => {
781 ensure!(
782 column_metadata.column_schema.data_type.is_string(),
783 InvalidColumnOptionSnafu {
784 column_name,
785 msg: "FULLTEXT index only supports string type".to_string(),
786 }
787 );
788 let current_fulltext_options = column_metadata
789 .column_schema
790 .fulltext_options()
791 .with_context(|_| GetFulltextOptionsSnafu {
792 column_name: column_name.to_string(),
793 })?;
794 set_column_fulltext_options(
795 column_metadata,
796 column_name,
797 options,
798 current_fulltext_options,
799 )?;
800 }
801 SetIndexOption::Inverted { .. } => {
802 column_metadata.column_schema.set_inverted_index(true)
803 }
804 SetIndexOption::Skipping {
805 column_name,
806 options,
807 } => {
808 column_metadata
809 .column_schema
810 .set_skipping_options(options)
811 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
812 }
813 }
814
815 Ok(())
816 }
817
818 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
819 match options {
820 UnsetIndexOption::Fulltext { column_name } => {
821 ensure!(
822 column_metadata.column_schema.data_type.is_string(),
823 InvalidColumnOptionSnafu {
824 column_name,
825 msg: "FULLTEXT index only supports string type".to_string(),
826 }
827 );
828
829 let current_fulltext_options = column_metadata
830 .column_schema
831 .fulltext_options()
832 .with_context(|_| GetFulltextOptionsSnafu {
833 column_name: column_name.to_string(),
834 })?;
835
836 unset_column_fulltext_options(
837 column_metadata,
838 column_name,
839 current_fulltext_options,
840 )?;
841 }
842 UnsetIndexOption::Inverted { .. } => {
843 column_metadata.column_schema.set_inverted_index(false)
844 }
845 UnsetIndexOption::Skipping { column_name } => {
846 column_metadata
847 .column_schema
848 .unset_skipping_options()
849 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
850 }
851 }
852
853 Ok(())
854 }
855
856 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
857 for name in column_names.iter() {
858 let meta = self
859 .column_metadatas
860 .iter_mut()
861 .find(|col| col.column_schema.name == *name);
862 if let Some(meta) = meta {
863 if !meta.column_schema.is_nullable() {
864 return InvalidRegionRequestSnafu {
865 region_id: self.region_id,
866 err: format!(
867 "column {name} is not nullable and `default` cannot be dropped",
868 ),
869 }
870 .fail();
871 }
872 meta.column_schema = meta
873 .column_schema
874 .clone()
875 .with_default_constraint(None)
876 .with_context(|_| CastDefaultValueSnafu {
877 reason: format!("Failed to drop default : {name:?}"),
878 })?;
879 } else {
880 return InvalidRegionRequestSnafu {
881 region_id: self.region_id,
882 err: format!("column {name} not found",),
883 }
884 .fail();
885 }
886 }
887 Ok(())
888 }
889
890 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
891 for set_default in set_defaults.iter() {
892 let meta = self
893 .column_metadatas
894 .iter_mut()
895 .find(|col| col.column_schema.name == set_default.name);
896 if let Some(meta) = meta {
897 let default_constraint = common_sql::convert::deserialize_default_constraint(
898 set_default.default_constraint.as_slice(),
899 &meta.column_schema.name,
900 &meta.column_schema.data_type,
901 )
902 .context(SqlCommonSnafu)?;
903
904 meta.column_schema = meta
905 .column_schema
906 .clone()
907 .with_default_constraint(default_constraint)
908 .with_context(|_| CastDefaultValueSnafu {
909 reason: format!("Failed to set default : {set_default:?}"),
910 })?;
911 } else {
912 return InvalidRegionRequestSnafu {
913 region_id: self.region_id,
914 err: format!("column {} not found", set_default.name),
915 }
916 .fail();
917 }
918 }
919 Ok(())
920 }
921}
922
923struct SkippedFields {
925 schema: SchemaRef,
927 time_index: ColumnId,
929 id_to_index: HashMap<ColumnId, usize>,
931}
932
933impl SkippedFields {
934 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
936 let column_schemas = column_metadatas
937 .iter()
938 .map(|column_metadata| column_metadata.column_schema.clone())
939 .collect();
940 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
941 let time_index = column_metadatas
942 .iter()
943 .find_map(|col| {
944 if col.semantic_type == SemanticType::Timestamp {
945 Some(col.column_id)
946 } else {
947 None
948 }
949 })
950 .context(InvalidMetaSnafu {
951 reason: "time index not found",
952 })?;
953 let id_to_index = column_metadatas
954 .iter()
955 .enumerate()
956 .map(|(idx, col)| (col.column_id, idx))
957 .collect();
958
959 Ok(SkippedFields {
960 schema,
961 time_index,
962 id_to_index,
963 })
964 }
965}
966
967#[derive(Snafu)]
968#[snafu(visibility(pub))]
969#[stack_trace_debug]
970pub enum MetadataError {
971 #[snafu(display("Invalid schema"))]
972 InvalidSchema {
973 source: datatypes::error::Error,
974 #[snafu(implicit)]
975 location: Location,
976 },
977
978 #[snafu(display("Invalid metadata, {}", reason))]
979 InvalidMeta {
980 reason: String,
981 #[snafu(implicit)]
982 location: Location,
983 },
984
985 #[snafu(display("Failed to ser/de json object"))]
986 SerdeJson {
987 #[snafu(implicit)]
988 location: Location,
989 #[snafu(source)]
990 error: serde_json::Error,
991 },
992
993 #[snafu(display("Invalid raw region request, err: {}", err))]
994 InvalidRawRegionRequest {
995 err: String,
996 #[snafu(implicit)]
997 location: Location,
998 },
999
1000 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1001 InvalidRegionRequest {
1002 region_id: RegionId,
1003 err: String,
1004 #[snafu(implicit)]
1005 location: Location,
1006 },
1007
1008 #[snafu(display("Unexpected schema error during project"))]
1009 SchemaProject {
1010 origin_schema: SchemaRef,
1011 projection: Vec<ColumnId>,
1012 #[snafu(implicit)]
1013 location: Location,
1014 source: datatypes::Error,
1015 },
1016
1017 #[snafu(display("Time index column not found"))]
1018 TimeIndexNotFound {
1019 #[snafu(implicit)]
1020 location: Location,
1021 },
1022
1023 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1024 ChangeColumnNotFound {
1025 column_name: String,
1026 region_id: RegionId,
1027 #[snafu(implicit)]
1028 location: Location,
1029 },
1030
1031 #[snafu(display("Failed to convert column schema"))]
1032 ConvertColumnSchema {
1033 source: api::error::Error,
1034 #[snafu(implicit)]
1035 location: Location,
1036 },
1037
1038 #[snafu(display("Failed to convert TimeRanges"))]
1039 ConvertTimeRanges {
1040 source: api::error::Error,
1041 #[snafu(implicit)]
1042 location: Location,
1043 },
1044
1045 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1046 InvalidSetRegionOptionRequest {
1047 key: String,
1048 value: String,
1049 #[snafu(implicit)]
1050 location: Location,
1051 },
1052
1053 #[snafu(display("Invalid set region option request, key: {}", key))]
1054 InvalidUnsetRegionOptionRequest {
1055 key: String,
1056 #[snafu(implicit)]
1057 location: Location,
1058 },
1059
1060 #[snafu(display("Failed to decode protobuf"))]
1061 DecodeProto {
1062 #[snafu(source)]
1063 error: prost::UnknownEnumValue,
1064 #[snafu(implicit)]
1065 location: Location,
1066 },
1067
1068 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1069 InvalidColumnOption {
1070 column_name: String,
1071 msg: String,
1072 #[snafu(implicit)]
1073 location: Location,
1074 },
1075
1076 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1077 SetFulltextOptions {
1078 column_name: String,
1079 source: datatypes::Error,
1080 #[snafu(implicit)]
1081 location: Location,
1082 },
1083
1084 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1085 GetFulltextOptions {
1086 column_name: String,
1087 source: datatypes::Error,
1088 #[snafu(implicit)]
1089 location: Location,
1090 },
1091
1092 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1093 SetSkippingIndexOptions {
1094 column_name: String,
1095 source: datatypes::Error,
1096 #[snafu(implicit)]
1097 location: Location,
1098 },
1099
1100 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1101 UnsetSkippingIndexOptions {
1102 column_name: String,
1103 source: datatypes::Error,
1104 #[snafu(implicit)]
1105 location: Location,
1106 },
1107
1108 #[snafu(display("Failed to decode arrow ipc record batches"))]
1109 DecodeArrowIpc {
1110 #[snafu(source)]
1111 error: arrow::error::ArrowError,
1112 #[snafu(implicit)]
1113 location: Location,
1114 },
1115
1116 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1117 CastDefaultValue {
1118 reason: String,
1119 source: datatypes::Error,
1120 #[snafu(implicit)]
1121 location: Location,
1122 },
1123
1124 #[snafu(display("Unexpected: {}", reason))]
1125 Unexpected {
1126 reason: String,
1127 #[snafu(implicit)]
1128 location: Location,
1129 },
1130
1131 #[snafu(display("Failed to encode/decode flight message"))]
1132 FlightCodec {
1133 source: common_grpc::Error,
1134 #[snafu(implicit)]
1135 location: Location,
1136 },
1137
1138 #[snafu(display("Invalid index option"))]
1139 InvalidIndexOption {
1140 #[snafu(implicit)]
1141 location: Location,
1142 #[snafu(source)]
1143 error: datatypes::error::Error,
1144 },
1145
1146 #[snafu(display("Sql common error"))]
1147 SqlCommon {
1148 source: common_sql::error::Error,
1149 #[snafu(implicit)]
1150 location: Location,
1151 },
1152}
1153
1154impl ErrorExt for MetadataError {
1155 fn status_code(&self) -> StatusCode {
1156 match self {
1157 Self::SqlCommon { source, .. } => source.status_code(),
1158 _ => StatusCode::InvalidArguments,
1159 }
1160 }
1161
1162 fn as_any(&self) -> &dyn Any {
1163 self
1164 }
1165}
1166
1167fn set_column_fulltext_options(
1176 column_meta: &mut ColumnMetadata,
1177 column_name: &str,
1178 options: &FulltextOptions,
1179 current_options: Option<FulltextOptions>,
1180) -> Result<()> {
1181 if let Some(current_options) = current_options {
1182 ensure!(
1183 current_options.analyzer == options.analyzer
1184 && current_options.case_sensitive == options.case_sensitive,
1185 InvalidColumnOptionSnafu {
1186 column_name,
1187 msg: format!("Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1188 current_options.analyzer, current_options.case_sensitive),
1189 }
1190 );
1191 }
1192
1193 column_meta
1194 .column_schema
1195 .set_fulltext_options(options)
1196 .context(SetFulltextOptionsSnafu { column_name })?;
1197
1198 Ok(())
1199}
1200
1201fn unset_column_fulltext_options(
1202 column_meta: &mut ColumnMetadata,
1203 column_name: &str,
1204 current_options: Option<FulltextOptions>,
1205) -> Result<()> {
1206 if let Some(mut current_options) = current_options
1207 && current_options.enable
1208 {
1209 current_options.enable = false;
1210 column_meta
1211 .column_schema
1212 .set_fulltext_options(¤t_options)
1213 .context(SetFulltextOptionsSnafu { column_name })?;
1214 } else {
1215 return InvalidColumnOptionSnafu {
1216 column_name,
1217 msg: "FULLTEXT index already disabled",
1218 }
1219 .fail();
1220 }
1221
1222 Ok(())
1223}
1224
1225#[cfg(test)]
1226mod test {
1227 use datatypes::prelude::ConcreteDataType;
1228 use datatypes::schema::{
1229 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1230 };
1231 use datatypes::value::Value;
1232
1233 use super::*;
1234
1235 fn create_builder() -> RegionMetadataBuilder {
1236 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1237 }
1238
1239 fn build_test_region_metadata() -> RegionMetadata {
1240 let mut builder = create_builder();
1241 builder
1242 .push_column_metadata(ColumnMetadata {
1243 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1244 semantic_type: SemanticType::Tag,
1245 column_id: 1,
1246 })
1247 .push_column_metadata(ColumnMetadata {
1248 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1249 semantic_type: SemanticType::Field,
1250 column_id: 2,
1251 })
1252 .push_column_metadata(ColumnMetadata {
1253 column_schema: ColumnSchema::new(
1254 "c",
1255 ConcreteDataType::timestamp_millisecond_datatype(),
1256 false,
1257 ),
1258 semantic_type: SemanticType::Timestamp,
1259 column_id: 3,
1260 })
1261 .primary_key(vec![1]);
1262 builder.build().unwrap()
1263 }
1264
1265 #[test]
1266 fn test_region_metadata() {
1267 let region_metadata = build_test_region_metadata();
1268 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1269 assert_eq!(
1270 "a",
1271 region_metadata.column_by_id(1).unwrap().column_schema.name
1272 );
1273 assert_eq!(None, region_metadata.column_by_id(10));
1274 }
1275
1276 #[test]
1277 fn test_region_metadata_serde() {
1278 let region_metadata = build_test_region_metadata();
1279 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1280 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1281 assert_eq!(region_metadata, deserialized);
1282 }
1283
1284 #[test]
1285 fn test_column_metadata_validate() {
1286 let mut builder = create_builder();
1287 let col = ColumnMetadata {
1288 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1289 semantic_type: SemanticType::Timestamp,
1290 column_id: 1,
1291 };
1292
1293 builder.push_column_metadata(col);
1294 let err = builder.build().unwrap_err();
1295 assert!(
1296 err.to_string()
1297 .contains("column `ts` is not timestamp type"),
1298 "unexpected err: {err}",
1299 );
1300 }
1301
1302 #[test]
1303 fn test_empty_region_metadata() {
1304 let builder = create_builder();
1305 let err = builder.build().unwrap_err();
1306 assert!(
1308 err.to_string().contains("time index not found"),
1309 "unexpected err: {err}",
1310 );
1311 }
1312
1313 #[test]
1314 fn test_same_column_id() {
1315 let mut builder = create_builder();
1316 builder
1317 .push_column_metadata(ColumnMetadata {
1318 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1319 semantic_type: SemanticType::Tag,
1320 column_id: 1,
1321 })
1322 .push_column_metadata(ColumnMetadata {
1323 column_schema: ColumnSchema::new(
1324 "b",
1325 ConcreteDataType::timestamp_millisecond_datatype(),
1326 false,
1327 ),
1328 semantic_type: SemanticType::Timestamp,
1329 column_id: 1,
1330 });
1331 let err = builder.build().unwrap_err();
1332 assert!(
1333 err.to_string()
1334 .contains("column a and b have the same column id"),
1335 "unexpected err: {err}",
1336 );
1337 }
1338
1339 #[test]
1340 fn test_duplicate_time_index() {
1341 let mut builder = create_builder();
1342 builder
1343 .push_column_metadata(ColumnMetadata {
1344 column_schema: ColumnSchema::new(
1345 "a",
1346 ConcreteDataType::timestamp_millisecond_datatype(),
1347 false,
1348 ),
1349 semantic_type: SemanticType::Timestamp,
1350 column_id: 1,
1351 })
1352 .push_column_metadata(ColumnMetadata {
1353 column_schema: ColumnSchema::new(
1354 "b",
1355 ConcreteDataType::timestamp_millisecond_datatype(),
1356 false,
1357 ),
1358 semantic_type: SemanticType::Timestamp,
1359 column_id: 2,
1360 });
1361 let err = builder.build().unwrap_err();
1362 assert!(
1363 err.to_string().contains("expect only one time index"),
1364 "unexpected err: {err}",
1365 );
1366 }
1367
1368 #[test]
1369 fn test_unknown_primary_key() {
1370 let mut builder = create_builder();
1371 builder
1372 .push_column_metadata(ColumnMetadata {
1373 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1374 semantic_type: SemanticType::Tag,
1375 column_id: 1,
1376 })
1377 .push_column_metadata(ColumnMetadata {
1378 column_schema: ColumnSchema::new(
1379 "b",
1380 ConcreteDataType::timestamp_millisecond_datatype(),
1381 false,
1382 ),
1383 semantic_type: SemanticType::Timestamp,
1384 column_id: 2,
1385 })
1386 .primary_key(vec![3]);
1387 let err = builder.build().unwrap_err();
1388 assert!(
1389 err.to_string().contains("unknown column id 3"),
1390 "unexpected err: {err}",
1391 );
1392 }
1393
1394 #[test]
1395 fn test_same_primary_key() {
1396 let mut builder = create_builder();
1397 builder
1398 .push_column_metadata(ColumnMetadata {
1399 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1400 semantic_type: SemanticType::Tag,
1401 column_id: 1,
1402 })
1403 .push_column_metadata(ColumnMetadata {
1404 column_schema: ColumnSchema::new(
1405 "b",
1406 ConcreteDataType::timestamp_millisecond_datatype(),
1407 false,
1408 ),
1409 semantic_type: SemanticType::Timestamp,
1410 column_id: 2,
1411 })
1412 .primary_key(vec![1, 1]);
1413 let err = builder.build().unwrap_err();
1414 assert!(
1415 err.to_string()
1416 .contains("duplicate column a in primary key"),
1417 "unexpected err: {err}",
1418 );
1419 }
1420
1421 #[test]
1422 fn test_in_time_index() {
1423 let mut builder = create_builder();
1424 builder
1425 .push_column_metadata(ColumnMetadata {
1426 column_schema: ColumnSchema::new(
1427 "ts",
1428 ConcreteDataType::timestamp_millisecond_datatype(),
1429 false,
1430 ),
1431 semantic_type: SemanticType::Timestamp,
1432 column_id: 1,
1433 })
1434 .primary_key(vec![1]);
1435 let err = builder.build().unwrap_err();
1436 assert!(
1437 err.to_string()
1438 .contains("column ts is already a time index column"),
1439 "unexpected err: {err}",
1440 );
1441 }
1442
1443 #[test]
1444 fn test_nullable_time_index() {
1445 let mut builder = create_builder();
1446 builder.push_column_metadata(ColumnMetadata {
1447 column_schema: ColumnSchema::new(
1448 "ts",
1449 ConcreteDataType::timestamp_millisecond_datatype(),
1450 true,
1451 ),
1452 semantic_type: SemanticType::Timestamp,
1453 column_id: 1,
1454 });
1455 let err = builder.build().unwrap_err();
1456 assert!(
1457 err.to_string()
1458 .contains("time index column ts must be NOT NULL"),
1459 "unexpected err: {err}",
1460 );
1461 }
1462
1463 #[test]
1464 fn test_primary_key_semantic_type() {
1465 let mut builder = create_builder();
1466 builder
1467 .push_column_metadata(ColumnMetadata {
1468 column_schema: ColumnSchema::new(
1469 "ts",
1470 ConcreteDataType::timestamp_millisecond_datatype(),
1471 false,
1472 ),
1473 semantic_type: SemanticType::Timestamp,
1474 column_id: 1,
1475 })
1476 .push_column_metadata(ColumnMetadata {
1477 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1478 semantic_type: SemanticType::Field,
1479 column_id: 2,
1480 })
1481 .primary_key(vec![2]);
1482 let err = builder.build().unwrap_err();
1483 assert!(
1484 err.to_string()
1485 .contains("semantic type of column a should be Tag, not Field"),
1486 "unexpected err: {err}",
1487 );
1488 }
1489
1490 #[test]
1491 fn test_primary_key_tag_num() {
1492 let mut builder = create_builder();
1493 builder
1494 .push_column_metadata(ColumnMetadata {
1495 column_schema: ColumnSchema::new(
1496 "ts",
1497 ConcreteDataType::timestamp_millisecond_datatype(),
1498 false,
1499 ),
1500 semantic_type: SemanticType::Timestamp,
1501 column_id: 1,
1502 })
1503 .push_column_metadata(ColumnMetadata {
1504 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1505 semantic_type: SemanticType::Tag,
1506 column_id: 2,
1507 })
1508 .push_column_metadata(ColumnMetadata {
1509 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1510 semantic_type: SemanticType::Tag,
1511 column_id: 3,
1512 })
1513 .primary_key(vec![2]);
1514 let err = builder.build().unwrap_err();
1515 assert!(
1516 err.to_string()
1517 .contains("number of primary key columns 1 not equal to tag columns 2"),
1518 "unexpected err: {err}",
1519 );
1520 }
1521
1522 #[test]
1523 fn test_bump_version() {
1524 let mut region_metadata = build_test_region_metadata();
1525 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1526 builder.bump_version();
1527 let new_meta = builder.build().unwrap();
1528 region_metadata.schema_version += 1;
1529 assert_eq!(region_metadata, new_meta);
1530 }
1531
1532 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1533 let semantic_type = if is_tag {
1534 SemanticType::Tag
1535 } else {
1536 SemanticType::Field
1537 };
1538 ColumnMetadata {
1539 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1540 semantic_type,
1541 column_id,
1542 }
1543 }
1544
1545 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1546 let actual: Vec<_> = metadata
1547 .column_metadatas
1548 .iter()
1549 .map(|col| &col.column_schema.name)
1550 .collect();
1551 assert_eq!(names, actual);
1552 }
1553
1554 fn get_columns_default_constraint(
1555 metadata: &RegionMetadata,
1556 name: String,
1557 ) -> Option<Option<&ColumnDefaultConstraint>> {
1558 metadata.column_metadatas.iter().find_map(|col| {
1559 if col.column_schema.name == name {
1560 Some(col.column_schema.default_constraint())
1561 } else {
1562 None
1563 }
1564 })
1565 }
1566
1567 #[test]
1568 fn test_alter() {
1569 let metadata = build_test_region_metadata();
1571 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1572 builder
1574 .alter(AlterKind::AddColumns {
1575 columns: vec![AddColumn {
1576 column_metadata: new_column_metadata("d", true, 4),
1577 location: None,
1578 }],
1579 })
1580 .unwrap();
1581 let metadata = builder.build().unwrap();
1582 check_columns(&metadata, &["a", "b", "c", "d"]);
1583 assert_eq!([1, 4], &metadata.primary_key[..]);
1584
1585 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1586 builder
1587 .alter(AlterKind::AddColumns {
1588 columns: vec![AddColumn {
1589 column_metadata: new_column_metadata("e", false, 5),
1590 location: Some(AddColumnLocation::First),
1591 }],
1592 })
1593 .unwrap();
1594 let metadata = builder.build().unwrap();
1595 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1596
1597 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1598 builder
1599 .alter(AlterKind::AddColumns {
1600 columns: vec![AddColumn {
1601 column_metadata: new_column_metadata("f", false, 6),
1602 location: Some(AddColumnLocation::After {
1603 column_name: "b".to_string(),
1604 }),
1605 }],
1606 })
1607 .unwrap();
1608 let metadata = builder.build().unwrap();
1609 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1610
1611 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1612 builder
1613 .alter(AlterKind::AddColumns {
1614 columns: vec![AddColumn {
1615 column_metadata: new_column_metadata("g", false, 7),
1616 location: Some(AddColumnLocation::After {
1617 column_name: "d".to_string(),
1618 }),
1619 }],
1620 })
1621 .unwrap();
1622 let metadata = builder.build().unwrap();
1623 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1624
1625 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1626 builder
1627 .alter(AlterKind::DropColumns {
1628 names: vec!["g".to_string(), "e".to_string()],
1629 })
1630 .unwrap();
1631 let metadata = builder.build().unwrap();
1632 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1633
1634 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1635 builder
1636 .alter(AlterKind::DropColumns {
1637 names: vec!["a".to_string()],
1638 })
1639 .unwrap();
1640 let err = builder.build().unwrap_err();
1642 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1643
1644 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1645 let mut column_metadata = new_column_metadata("g", false, 8);
1646 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1647 column_metadata.column_schema = column_metadata
1648 .column_schema
1649 .with_default_constraint(default_constraint.clone())
1650 .unwrap();
1651 builder
1652 .alter(AlterKind::AddColumns {
1653 columns: vec![AddColumn {
1654 column_metadata,
1655 location: None,
1656 }],
1657 })
1658 .unwrap();
1659 let metadata = builder.build().unwrap();
1660 assert_eq!(
1661 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1662 default_constraint.as_ref()
1663 );
1664 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1665
1666 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1667 builder
1668 .alter(AlterKind::DropDefaults {
1669 names: vec!["g".to_string()],
1670 })
1671 .unwrap();
1672 let metadata = builder.build().unwrap();
1673 assert_eq!(
1674 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1675 None
1676 );
1677 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1678
1679 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1680 builder
1681 .alter(AlterKind::DropColumns {
1682 names: vec!["g".to_string()],
1683 })
1684 .unwrap();
1685 let metadata = builder.build().unwrap();
1686 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1687
1688 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1689 builder
1690 .alter(AlterKind::ModifyColumnTypes {
1691 columns: vec![ModifyColumnType {
1692 column_name: "b".to_string(),
1693 target_type: ConcreteDataType::string_datatype(),
1694 }],
1695 })
1696 .unwrap();
1697 let metadata = builder.build().unwrap();
1698 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1699 let b_type = &metadata
1700 .column_by_name("b")
1701 .unwrap()
1702 .column_schema
1703 .data_type;
1704 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1705
1706 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1707 builder
1708 .alter(AlterKind::SetIndexes {
1709 options: vec![SetIndexOption::Fulltext {
1710 column_name: "b".to_string(),
1711 options: FulltextOptions::new_unchecked(
1712 true,
1713 FulltextAnalyzer::Chinese,
1714 true,
1715 FulltextBackend::Bloom,
1716 1000,
1717 0.01,
1718 ),
1719 }],
1720 })
1721 .unwrap();
1722 let metadata = builder.build().unwrap();
1723 let a_fulltext_options = metadata
1724 .column_by_name("b")
1725 .unwrap()
1726 .column_schema
1727 .fulltext_options()
1728 .unwrap()
1729 .unwrap();
1730 assert!(a_fulltext_options.enable);
1731 assert_eq!(
1732 datatypes::schema::FulltextAnalyzer::Chinese,
1733 a_fulltext_options.analyzer
1734 );
1735 assert!(a_fulltext_options.case_sensitive);
1736
1737 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1738 builder
1739 .alter(AlterKind::UnsetIndexes {
1740 options: vec![UnsetIndexOption::Fulltext {
1741 column_name: "b".to_string(),
1742 }],
1743 })
1744 .unwrap();
1745 let metadata = builder.build().unwrap();
1746 let a_fulltext_options = metadata
1747 .column_by_name("b")
1748 .unwrap()
1749 .column_schema
1750 .fulltext_options()
1751 .unwrap()
1752 .unwrap();
1753 assert!(!a_fulltext_options.enable);
1754 assert_eq!(
1755 datatypes::schema::FulltextAnalyzer::Chinese,
1756 a_fulltext_options.analyzer
1757 );
1758 assert!(a_fulltext_options.case_sensitive);
1759 }
1760
1761 #[test]
1762 fn test_add_if_not_exists() {
1763 let metadata = build_test_region_metadata();
1765 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1766 builder
1768 .alter(AlterKind::AddColumns {
1769 columns: vec![
1770 AddColumn {
1771 column_metadata: new_column_metadata("d", true, 4),
1772 location: None,
1773 },
1774 AddColumn {
1775 column_metadata: new_column_metadata("d", true, 4),
1776 location: None,
1777 },
1778 ],
1779 })
1780 .unwrap();
1781 let metadata = builder.build().unwrap();
1782 check_columns(&metadata, &["a", "b", "c", "d"]);
1783 assert_eq!([1, 4], &metadata.primary_key[..]);
1784
1785 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1786 builder
1788 .alter(AlterKind::AddColumns {
1789 columns: vec![AddColumn {
1790 column_metadata: new_column_metadata("b", false, 2),
1791 location: None,
1792 }],
1793 })
1794 .unwrap();
1795 let metadata = builder.build().unwrap();
1796 check_columns(&metadata, &["a", "b", "c", "d"]);
1797 }
1798
1799 #[test]
1800 fn test_add_column_with_inverted_index() {
1801 let metadata = build_test_region_metadata();
1805 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1806 let mut col = new_column_metadata("d", true, 4);
1808 col.column_schema.set_inverted_index(true);
1809 builder
1810 .alter(AlterKind::AddColumns {
1811 columns: vec![
1812 AddColumn {
1813 column_metadata: col,
1814 location: None,
1815 },
1816 AddColumn {
1817 column_metadata: new_column_metadata("e", true, 5),
1818 location: None,
1819 },
1820 ],
1821 })
1822 .unwrap();
1823 let metadata = builder.build().unwrap();
1824 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1825 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1826 let column_metadata = metadata.column_by_name("a").unwrap();
1827 assert!(!column_metadata.column_schema.is_inverted_indexed());
1828 let column_metadata = metadata.column_by_name("b").unwrap();
1829 assert!(!column_metadata.column_schema.is_inverted_indexed());
1830 let column_metadata = metadata.column_by_name("c").unwrap();
1831 assert!(!column_metadata.column_schema.is_inverted_indexed());
1832 let column_metadata = metadata.column_by_name("d").unwrap();
1833 assert!(column_metadata.column_schema.is_inverted_indexed());
1834 let column_metadata = metadata.column_by_name("e").unwrap();
1835 assert!(!column_metadata.column_schema.is_inverted_indexed());
1836 }
1837
1838 #[test]
1839 fn test_drop_if_exists() {
1840 let metadata = build_test_region_metadata();
1842 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1843 builder
1845 .alter(AlterKind::AddColumns {
1846 columns: vec![
1847 AddColumn {
1848 column_metadata: new_column_metadata("d", false, 4),
1849 location: None,
1850 },
1851 AddColumn {
1852 column_metadata: new_column_metadata("e", false, 5),
1853 location: None,
1854 },
1855 ],
1856 })
1857 .unwrap();
1858 let metadata = builder.build().unwrap();
1859 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1860
1861 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1862 builder
1863 .alter(AlterKind::DropColumns {
1864 names: vec!["b".to_string(), "b".to_string()],
1865 })
1866 .unwrap();
1867 let metadata = builder.build().unwrap();
1868 check_columns(&metadata, &["a", "c", "d", "e"]);
1869
1870 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1871 builder
1872 .alter(AlterKind::DropColumns {
1873 names: vec!["b".to_string(), "e".to_string()],
1874 })
1875 .unwrap();
1876 let metadata = builder.build().unwrap();
1877 check_columns(&metadata, &["a", "c", "d"]);
1878 }
1879
1880 #[test]
1881 fn test_invalid_column_name() {
1882 let mut builder = create_builder();
1883 builder.push_column_metadata(ColumnMetadata {
1884 column_schema: ColumnSchema::new(
1885 "__sequence",
1886 ConcreteDataType::timestamp_millisecond_datatype(),
1887 false,
1888 ),
1889 semantic_type: SemanticType::Timestamp,
1890 column_id: 1,
1891 });
1892 let err = builder.build().unwrap_err();
1893 assert!(
1894 err.to_string()
1895 .contains("internal column name that can not be used"),
1896 "unexpected err: {err}",
1897 );
1898 }
1899
1900 #[test]
1901 fn test_debug_for_column_metadata() {
1902 let region_metadata = build_test_region_metadata();
1903 let formatted = format!("{:?}", region_metadata);
1904 assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
1905 }
1906
1907 #[test]
1908 fn test_region_metadata_deserialize_default_primary_key_encoding() {
1909 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
1910 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1911 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
1912
1913 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
1914 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
1915 assert_eq!(
1916 deserialized.primary_key_encoding,
1917 PrimaryKeyEncoding::Sparse
1918 );
1919 }
1920}