1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_base::hash::partition_expr_version;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_macro::stack_trace_debug;
31use datatypes::arrow;
32use datatypes::arrow::datatypes::FieldRef;
33use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
34use datatypes::types::TimestampType;
35use itertools::Itertools;
36use serde::de::Error;
37use serde::{Deserialize, Deserializer, Serialize};
38use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
39
40use crate::codec::PrimaryKeyEncoding;
41use crate::region_request::{
42 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
43};
44use crate::storage::consts::is_internal_column;
45use crate::storage::{ColumnId, RegionId};
46
47pub type Result<T> = std::result::Result<T, MetadataError>;
48
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
51pub struct ColumnMetadata {
52 pub column_schema: ColumnSchema,
54 pub semantic_type: SemanticType,
56 pub column_id: ColumnId,
58}
59
60impl fmt::Debug for ColumnMetadata {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(
63 f,
64 "[{:?} {:?} {:?}]",
65 self.column_schema, self.semantic_type, self.column_id,
66 )
67 }
68}
69
70impl ColumnMetadata {
71 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
73 let column_id = column_def.column_id;
74 let column_def = column_def
75 .column_def
76 .context(InvalidRawRegionRequestSnafu {
77 err: "column_def is absent",
78 })?;
79 let semantic_type = column_def.semantic_type();
80 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
81
82 Ok(Self {
83 column_schema,
84 semantic_type,
85 column_id,
86 })
87 }
88
89 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
91 serde_json::to_vec(columns)
92 }
93
94 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
96 serde_json::from_slice(bytes)
97 }
98
99 pub fn is_same_datatype(&self, other: &Self) -> bool {
100 self.column_schema.data_type == other.column_schema.data_type
101 }
102}
103
104#[cfg_attr(doc, aquamarine::aquamarine)]
105#[derive(Clone, PartialEq, Eq, Serialize)]
129pub struct RegionMetadata {
130 #[serde(skip)]
132 pub schema: SchemaRef,
133
134 #[serde(skip)]
138 time_index: ColumnId,
139 #[serde(skip)]
141 id_to_index: HashMap<ColumnId, usize>,
142
143 pub column_metadatas: Vec<ColumnMetadata>,
146 pub primary_key: Vec<ColumnId>,
148
149 pub region_id: RegionId,
151 pub schema_version: u64,
155
156 pub primary_key_encoding: PrimaryKeyEncoding,
158
159 pub partition_expr: Option<String>,
164 #[serde(skip)]
165 pub partition_expr_version: u64,
166}
167
168impl fmt::Debug for RegionMetadata {
169 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
170 f.debug_struct("RegionMetadata")
171 .field("column_metadatas", &self.column_metadatas)
172 .field("time_index", &self.time_index)
173 .field("primary_key", &self.primary_key)
174 .field("region_id", &self.region_id)
175 .field("schema_version", &self.schema_version)
176 .field("partition_expr", &self.partition_expr)
177 .finish()
178 }
179}
180
181pub type RegionMetadataRef = Arc<RegionMetadata>;
182
183impl<'de> Deserialize<'de> for RegionMetadata {
184 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
185 where
186 D: Deserializer<'de>,
187 {
188 #[derive(Deserialize)]
190 struct RegionMetadataWithoutSchema {
191 column_metadatas: Vec<ColumnMetadata>,
192 primary_key: Vec<ColumnId>,
193 region_id: RegionId,
194 schema_version: u64,
195 #[serde(default)]
196 primary_key_encoding: PrimaryKeyEncoding,
197 #[serde(default)]
198 partition_expr: Option<String>,
199 }
200
201 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
202 let skipped =
203 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
204
205 let partition_expr_version =
206 partition_expr_version(without_schema.partition_expr.as_deref());
207
208 Ok(Self {
209 schema: skipped.schema,
210 time_index: skipped.time_index,
211 id_to_index: skipped.id_to_index,
212 column_metadatas: without_schema.column_metadatas,
213 primary_key: without_schema.primary_key,
214 region_id: without_schema.region_id,
215 schema_version: without_schema.schema_version,
216 primary_key_encoding: without_schema.primary_key_encoding,
217 partition_expr: without_schema.partition_expr,
218 partition_expr_version,
219 })
220 }
221}
222
223impl RegionMetadata {
224 pub fn from_json(s: &str) -> Result<Self> {
226 serde_json::from_str(s).context(SerdeJsonSnafu)
227 }
228
229 pub fn to_json(&self) -> Result<String> {
231 serde_json::to_string(&self).context(SerdeJsonSnafu)
232 }
233
234 pub fn set_partition_expr(&mut self, expr: Option<String>) {
235 self.partition_expr_version = partition_expr_version(expr.as_deref());
236 self.partition_expr = expr;
237 }
238
239 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
241 self.id_to_index
242 .get(&column_id)
243 .map(|index| &self.column_metadatas[*index])
244 }
245
246 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
248 self.id_to_index.get(&column_id).copied()
249 }
250
251 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
253 self.column_metadatas
254 .iter()
255 .position(|col| col.column_schema.name == column_name)
256 }
257
258 pub fn time_index_column(&self) -> &ColumnMetadata {
263 let index = self.id_to_index[&self.time_index];
264 &self.column_metadatas[index]
265 }
266
267 pub fn time_index_type(&self) -> TimestampType {
272 let index = self.id_to_index[&self.time_index];
273 self.column_metadatas[index]
274 .column_schema
275 .data_type
276 .as_timestamp()
277 .unwrap()
278 }
279
280 pub fn time_index_column_pos(&self) -> usize {
282 self.id_to_index[&self.time_index]
283 }
284
285 pub fn time_index_field(&self) -> FieldRef {
287 let index = self.id_to_index[&self.time_index];
288 self.schema.arrow_schema().fields[index].clone()
289 }
290
291 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
293 self.schema
294 .column_index_by_name(name)
295 .map(|index| &self.column_metadatas[index])
296 }
297
298 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
300 self.primary_key
302 .iter()
303 .map(|id| self.column_by_id(*id).unwrap())
304 }
305
306 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
311 self.column_metadatas
312 .iter()
313 .filter(|column| column.semantic_type == SemanticType::Field)
314 }
315
316 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
320 self.primary_key.iter().position(|id| *id == column_id)
321 }
322
323 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
327 ensure!(
329 projection.contains(&self.time_index),
330 TimeIndexNotFoundSnafu
331 );
332
333 let indices_to_preserve = projection
335 .iter()
336 .map(|id| {
337 self.column_index_by_id(*id)
338 .with_context(|| InvalidRegionRequestSnafu {
339 region_id: self.region_id,
340 err: format!("column id {} not found", id),
341 })
342 })
343 .collect::<Result<Vec<_>>>()?;
344
345 let projected_schema =
347 self.schema
348 .try_project(&indices_to_preserve)
349 .with_context(|_| SchemaProjectSnafu {
350 origin_schema: self.schema.clone(),
351 projection: projection.to_vec(),
352 })?;
353
354 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
356 let mut projected_primary_key = vec![];
357 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
358 for index in indices_to_preserve {
359 let col = self.column_metadatas[index].clone();
360 if col.semantic_type == SemanticType::Tag {
361 projected_primary_key.push(col.column_id);
362 }
363 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
364 projected_column_metadatas.push(col);
365 }
366
367 Ok(RegionMetadata {
368 schema: Arc::new(projected_schema),
369 time_index: self.time_index,
370 id_to_index: projected_id_to_index,
371 column_metadatas: projected_column_metadatas,
372 primary_key: projected_primary_key,
373 region_id: self.region_id,
374 schema_version: self.schema_version,
375 primary_key_encoding: self.primary_key_encoding,
376 partition_expr: self.partition_expr.clone(),
377 partition_expr_version: partition_expr_version(self.partition_expr.as_deref()),
378 })
379 }
380
381 pub fn inverted_indexed_column_ids<'a>(
383 &self,
384 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
385 ) -> HashSet<ColumnId> {
386 let mut inverted_index = self
387 .column_metadatas
388 .iter()
389 .filter(|column| column.column_schema.is_inverted_indexed())
390 .map(|column| column.column_id)
391 .collect::<HashSet<_>>();
392
393 for ignored in ignore_column_ids {
394 inverted_index.remove(ignored);
395 }
396
397 inverted_index
398 }
399
400 pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
403 self.column_metadatas
404 .iter()
405 .filter_map(|column| {
406 column
407 .column_schema
408 .vector_index_options()
409 .ok()
410 .flatten()
411 .map(|options| (column.column_id, options))
412 })
413 .collect()
414 }
415
416 fn validate(&self) -> Result<()> {
418 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
420 for col in &self.column_metadatas {
421 Self::validate_column_metadata(col)?;
423
424 ensure!(
427 !id_names.contains_key(&col.column_id),
428 InvalidMetaSnafu {
429 reason: format!(
430 "column {} and {} have the same column id {}",
431 id_names[&col.column_id], col.column_schema.name, col.column_id,
432 ),
433 }
434 );
435 id_names.insert(col.column_id, &col.column_schema.name);
436 }
437
438 let time_indexes = self
440 .column_metadatas
441 .iter()
442 .filter(|col| col.semantic_type == SemanticType::Timestamp)
443 .collect::<Vec<_>>();
444 ensure!(
445 time_indexes.len() == 1,
446 InvalidMetaSnafu {
447 reason: format!(
448 "expect only one time index, found {}: {}",
449 time_indexes.len(),
450 time_indexes
451 .iter()
452 .map(|c| &c.column_schema.name)
453 .join(", ")
454 ),
455 }
456 );
457
458 ensure!(
460 !self.time_index_column().column_schema.is_nullable(),
461 InvalidMetaSnafu {
462 reason: format!(
463 "time index column {} must be NOT NULL",
464 self.time_index_column().column_schema.name
465 ),
466 }
467 );
468
469 if !self.primary_key.is_empty() {
470 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
471 for column_id in &self.primary_key {
473 ensure!(
475 id_names.contains_key(column_id),
476 InvalidMetaSnafu {
477 reason: format!("unknown column id {}", column_id),
478 }
479 );
480
481 let column = self.column_by_id(*column_id).unwrap();
483 ensure!(
485 !pk_ids.contains(&column_id),
486 InvalidMetaSnafu {
487 reason: format!(
488 "duplicate column {} in primary key",
489 column.column_schema.name
490 ),
491 }
492 );
493
494 ensure!(
496 *column_id != self.time_index,
497 InvalidMetaSnafu {
498 reason: format!(
499 "column {} is already a time index column",
500 column.column_schema.name,
501 ),
502 }
503 );
504
505 ensure!(
507 column.semantic_type == SemanticType::Tag,
508 InvalidMetaSnafu {
509 reason: format!(
510 "semantic type of column {} should be Tag, not {:?}",
511 column.column_schema.name, column.semantic_type
512 ),
513 }
514 );
515
516 pk_ids.insert(column_id);
517 }
518 }
519
520 let num_tag = self
522 .column_metadatas
523 .iter()
524 .filter(|col| col.semantic_type == SemanticType::Tag)
525 .count();
526 ensure!(
527 num_tag == self.primary_key.len(),
528 InvalidMetaSnafu {
529 reason: format!(
530 "number of primary key columns {} not equal to tag columns {}",
531 self.primary_key.len(),
532 num_tag
533 ),
534 }
535 );
536
537 Ok(())
538 }
539
540 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
542 if column_metadata.semantic_type == SemanticType::Timestamp {
543 ensure!(
544 column_metadata.column_schema.data_type.is_timestamp(),
545 InvalidMetaSnafu {
546 reason: format!(
547 "column `{}` is not timestamp type",
548 column_metadata.column_schema.name
549 ),
550 }
551 );
552 }
553
554 ensure!(
555 !is_internal_column(&column_metadata.column_schema.name),
556 InvalidMetaSnafu {
557 reason: format!(
558 "{} is internal column name that can not be used",
559 column_metadata.column_schema.name
560 ),
561 }
562 );
563
564 Ok(())
565 }
566}
567
568pub struct RegionMetadataBuilder {
570 region_id: RegionId,
571 column_metadatas: Vec<ColumnMetadata>,
572 primary_key: Vec<ColumnId>,
573 schema_version: u64,
574 primary_key_encoding: PrimaryKeyEncoding,
575 partition_expr: Option<String>,
576}
577
578impl RegionMetadataBuilder {
579 pub fn new(id: RegionId) -> Self {
581 Self {
582 region_id: id,
583 column_metadatas: vec![],
584 primary_key: vec![],
585 schema_version: 0,
586 primary_key_encoding: PrimaryKeyEncoding::Dense,
587 partition_expr: None,
588 }
589 }
590
591 pub fn from_existing(existing: RegionMetadata) -> Self {
593 Self {
594 column_metadatas: existing.column_metadatas,
595 primary_key: existing.primary_key,
596 region_id: existing.region_id,
597 schema_version: existing.schema_version,
598 primary_key_encoding: existing.primary_key_encoding,
599 partition_expr: existing.partition_expr,
600 }
601 }
602
603 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
605 self.primary_key_encoding = encoding;
606 self
607 }
608
609 pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
611 self.partition_expr = expr_json;
612 self
613 }
614
615 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
617 self.column_metadatas.push(column_metadata);
618 self
619 }
620
621 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
623 self.primary_key = key;
624 self
625 }
626
627 pub fn bump_version(&mut self) -> &mut Self {
629 self.schema_version += 1;
630 self
631 }
632
633 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
637 match kind {
638 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
639 AlterKind::DropColumns { names } => self.drop_columns(&names),
640 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
641 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
642 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
643 AlterKind::SetRegionOptions { options: _ } => {
644 }
646 AlterKind::UnsetRegionOptions { keys: _ } => {
647 }
649 AlterKind::DropDefaults { names } => {
650 self.drop_defaults(names)?;
651 }
652 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
653 AlterKind::SyncColumns { column_metadatas } => {
654 self.primary_key = column_metadatas
655 .iter()
656 .filter_map(|column_metadata| {
657 if column_metadata.semantic_type == SemanticType::Tag {
658 Some(column_metadata.column_id)
659 } else {
660 None
661 }
662 })
663 .collect::<Vec<_>>();
664 self.column_metadatas = column_metadatas;
665 }
666 }
667 Ok(self)
668 }
669
670 pub fn build(self) -> Result<RegionMetadata> {
672 self.build_with_options(true)
673 }
674
675 pub fn build_without_validation(self) -> Result<RegionMetadata> {
680 self.build_with_options(false)
681 }
682
683 fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
684 let skipped = SkippedFields::new(&self.column_metadatas)?;
685
686 let partition_expr_version = partition_expr_version(self.partition_expr.as_deref());
687 let meta = RegionMetadata {
688 schema: skipped.schema,
689 time_index: skipped.time_index,
690 id_to_index: skipped.id_to_index,
691 column_metadatas: self.column_metadatas,
692 primary_key: self.primary_key,
693 region_id: self.region_id,
694 schema_version: self.schema_version,
695 primary_key_encoding: self.primary_key_encoding,
696 partition_expr: self.partition_expr,
697 partition_expr_version,
698 };
699
700 if validate {
701 meta.validate()?;
702 }
703
704 Ok(meta)
705 }
706
707 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
709 let mut names: HashSet<_> = self
710 .column_metadatas
711 .iter()
712 .map(|col| col.column_schema.name.clone())
713 .collect();
714
715 for add_column in columns {
716 if names.contains(&add_column.column_metadata.column_schema.name) {
717 continue;
719 }
720
721 let column_id = add_column.column_metadata.column_id;
722 let semantic_type = add_column.column_metadata.semantic_type;
723 let column_name = add_column.column_metadata.column_schema.name.clone();
724 match add_column.location {
725 None => {
726 self.column_metadatas.push(add_column.column_metadata);
727 }
728 Some(AddColumnLocation::First) => {
729 self.column_metadatas.insert(0, add_column.column_metadata);
730 }
731 Some(AddColumnLocation::After { column_name }) => {
732 let pos = self
733 .column_metadatas
734 .iter()
735 .position(|col| col.column_schema.name == column_name)
736 .context(InvalidRegionRequestSnafu {
737 region_id: self.region_id,
738 err: format!(
739 "column {} not found, failed to add column {} after it",
740 column_name, add_column.column_metadata.column_schema.name
741 ),
742 })?;
743 self.column_metadatas
745 .insert(pos + 1, add_column.column_metadata);
746 }
747 }
748 names.insert(column_name);
749 if semantic_type == SemanticType::Tag {
750 self.primary_key.push(column_id);
752 }
753 }
754
755 Ok(())
756 }
757
758 fn drop_columns(&mut self, names: &[String]) {
760 let name_set: HashSet<_> = names.iter().collect();
761 self.column_metadatas
762 .retain(|col| !name_set.contains(&col.column_schema.name));
763 }
764
765 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
767 let mut change_type_map: HashMap<_, _> = columns
768 .into_iter()
769 .map(
770 |ModifyColumnType {
771 column_name,
772 target_type,
773 }| (column_name, target_type),
774 )
775 .collect();
776
777 for column_meta in self.column_metadatas.iter_mut() {
778 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
779 column_meta.column_schema.data_type = target_type.clone();
780 let new_default =
782 if let Some(default_value) = column_meta.column_schema.default_constraint() {
783 Some(
784 default_value
785 .cast_to_datatype(&target_type)
786 .with_context(|_| CastDefaultValueSnafu {
787 reason: format!(
788 "Failed to cast default value from {:?} to type {:?}",
789 default_value, target_type
790 ),
791 })?,
792 )
793 } else {
794 None
795 };
796 column_meta.column_schema = column_meta
797 .column_schema
798 .clone()
799 .with_default_constraint(new_default.clone())
800 .with_context(|_| CastDefaultValueSnafu {
801 reason: format!("Failed to set new default: {:?}", new_default),
802 })?;
803 }
804 }
805
806 Ok(())
807 }
808
809 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
810 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
811 for option in &options {
812 set_index_map
813 .entry(option.column_name())
814 .or_default()
815 .push(option);
816 }
817
818 for column_metadata in self.column_metadatas.iter_mut() {
819 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
820 for option in options {
821 Self::set_index(column_metadata, option)?;
822 }
823 }
824 }
825
826 Ok(())
827 }
828
829 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
830 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
831 for option in &options {
832 unset_index_map
833 .entry(option.column_name())
834 .or_default()
835 .push(option);
836 }
837
838 for column_metadata in self.column_metadatas.iter_mut() {
839 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
840 for option in options {
841 Self::unset_index(column_metadata, option)?;
842 }
843 }
844 }
845
846 Ok(())
847 }
848
849 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
850 match options {
851 SetIndexOption::Fulltext {
852 column_name,
853 options,
854 } => {
855 ensure!(
856 column_metadata.column_schema.data_type.is_string(),
857 InvalidColumnOptionSnafu {
858 column_name,
859 msg: "FULLTEXT index only supports string type".to_string(),
860 }
861 );
862 let current_fulltext_options = column_metadata
863 .column_schema
864 .fulltext_options()
865 .with_context(|_| GetFulltextOptionsSnafu {
866 column_name: column_name.clone(),
867 })?;
868 set_column_fulltext_options(
869 column_metadata,
870 column_name,
871 options,
872 current_fulltext_options,
873 )?;
874 }
875 SetIndexOption::Inverted { .. } => {
876 column_metadata.column_schema.set_inverted_index(true)
877 }
878 SetIndexOption::Skipping {
879 column_name,
880 options,
881 } => {
882 column_metadata
883 .column_schema
884 .set_skipping_options(options)
885 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
886 }
887 }
888
889 Ok(())
890 }
891
892 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
893 match options {
894 UnsetIndexOption::Fulltext { column_name } => {
895 ensure!(
896 column_metadata.column_schema.data_type.is_string(),
897 InvalidColumnOptionSnafu {
898 column_name,
899 msg: "FULLTEXT index only supports string type".to_string(),
900 }
901 );
902
903 let current_fulltext_options = column_metadata
904 .column_schema
905 .fulltext_options()
906 .with_context(|_| GetFulltextOptionsSnafu {
907 column_name: column_name.clone(),
908 })?;
909
910 unset_column_fulltext_options(
911 column_metadata,
912 column_name,
913 current_fulltext_options,
914 )?;
915 }
916 UnsetIndexOption::Inverted { .. } => {
917 column_metadata.column_schema.set_inverted_index(false)
918 }
919 UnsetIndexOption::Skipping { column_name } => {
920 column_metadata
921 .column_schema
922 .unset_skipping_options()
923 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
924 }
925 }
926
927 Ok(())
928 }
929
930 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
931 for name in column_names.iter() {
932 let meta = self
933 .column_metadatas
934 .iter_mut()
935 .find(|col| col.column_schema.name == *name);
936 if let Some(meta) = meta {
937 if !meta.column_schema.is_nullable() {
938 return InvalidRegionRequestSnafu {
939 region_id: self.region_id,
940 err: format!(
941 "column {name} is not nullable and `default` cannot be dropped",
942 ),
943 }
944 .fail();
945 }
946 meta.column_schema = meta
947 .column_schema
948 .clone()
949 .with_default_constraint(None)
950 .with_context(|_| CastDefaultValueSnafu {
951 reason: format!("Failed to drop default : {name:?}"),
952 })?;
953 } else {
954 return InvalidRegionRequestSnafu {
955 region_id: self.region_id,
956 err: format!("column {name} not found",),
957 }
958 .fail();
959 }
960 }
961 Ok(())
962 }
963
964 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
965 for set_default in set_defaults.iter() {
966 let meta = self
967 .column_metadatas
968 .iter_mut()
969 .find(|col| col.column_schema.name == set_default.name);
970 if let Some(meta) = meta {
971 let default_constraint = common_sql::convert::deserialize_default_constraint(
972 set_default.default_constraint.as_slice(),
973 &meta.column_schema.name,
974 &meta.column_schema.data_type,
975 )
976 .context(SqlCommonSnafu)?;
977
978 meta.column_schema = meta
979 .column_schema
980 .clone()
981 .with_default_constraint(default_constraint)
982 .with_context(|_| CastDefaultValueSnafu {
983 reason: format!("Failed to set default : {set_default:?}"),
984 })?;
985 } else {
986 return InvalidRegionRequestSnafu {
987 region_id: self.region_id,
988 err: format!("column {} not found", set_default.name),
989 }
990 .fail();
991 }
992 }
993 Ok(())
994 }
995}
996
997struct SkippedFields {
999 schema: SchemaRef,
1001 time_index: ColumnId,
1003 id_to_index: HashMap<ColumnId, usize>,
1005}
1006
1007impl SkippedFields {
1008 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
1010 let column_schemas = column_metadatas
1011 .iter()
1012 .map(|column_metadata| column_metadata.column_schema.clone())
1013 .collect();
1014 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1015 let time_index = column_metadatas
1016 .iter()
1017 .find_map(|col| {
1018 if col.semantic_type == SemanticType::Timestamp {
1019 Some(col.column_id)
1020 } else {
1021 None
1022 }
1023 })
1024 .context(InvalidMetaSnafu {
1025 reason: "time index not found",
1026 })?;
1027 let id_to_index = column_metadatas
1028 .iter()
1029 .enumerate()
1030 .map(|(idx, col)| (col.column_id, idx))
1031 .collect();
1032
1033 Ok(SkippedFields {
1034 schema,
1035 time_index,
1036 id_to_index,
1037 })
1038 }
1039}
1040
1041#[derive(Snafu)]
1042#[snafu(visibility(pub))]
1043#[stack_trace_debug]
1044pub enum MetadataError {
1045 #[snafu(display("Invalid schema"))]
1046 InvalidSchema {
1047 source: datatypes::error::Error,
1048 #[snafu(implicit)]
1049 location: Location,
1050 },
1051
1052 #[snafu(display("Invalid metadata, {}", reason))]
1053 InvalidMeta {
1054 reason: String,
1055 #[snafu(implicit)]
1056 location: Location,
1057 },
1058
1059 #[snafu(display("Failed to ser/de json object"))]
1060 SerdeJson {
1061 #[snafu(implicit)]
1062 location: Location,
1063 #[snafu(source)]
1064 error: serde_json::Error,
1065 },
1066
1067 #[snafu(display("Invalid raw region request, err: {}", err))]
1068 InvalidRawRegionRequest {
1069 err: String,
1070 #[snafu(implicit)]
1071 location: Location,
1072 },
1073
1074 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1075 InvalidRegionRequest {
1076 region_id: RegionId,
1077 err: String,
1078 #[snafu(implicit)]
1079 location: Location,
1080 },
1081
1082 #[snafu(display("Unexpected schema error during project"))]
1083 SchemaProject {
1084 origin_schema: SchemaRef,
1085 projection: Vec<ColumnId>,
1086 #[snafu(implicit)]
1087 location: Location,
1088 source: datatypes::Error,
1089 },
1090
1091 #[snafu(display("Time index column not found"))]
1092 TimeIndexNotFound {
1093 #[snafu(implicit)]
1094 location: Location,
1095 },
1096
1097 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1098 ChangeColumnNotFound {
1099 column_name: String,
1100 region_id: RegionId,
1101 #[snafu(implicit)]
1102 location: Location,
1103 },
1104
1105 #[snafu(display("Failed to convert column schema"))]
1106 ConvertColumnSchema {
1107 source: api::error::Error,
1108 #[snafu(implicit)]
1109 location: Location,
1110 },
1111
1112 #[snafu(display("Failed to convert TimeRanges"))]
1113 ConvertTimeRanges {
1114 source: api::error::Error,
1115 #[snafu(implicit)]
1116 location: Location,
1117 },
1118
1119 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1120 InvalidSetRegionOptionRequest {
1121 key: String,
1122 value: String,
1123 #[snafu(implicit)]
1124 location: Location,
1125 },
1126
1127 #[snafu(display("Invalid set region option request, key: {}", key))]
1128 InvalidUnsetRegionOptionRequest {
1129 key: String,
1130 #[snafu(implicit)]
1131 location: Location,
1132 },
1133
1134 #[snafu(display("Failed to decode protobuf"))]
1135 DecodeProto {
1136 #[snafu(source)]
1137 error: prost::UnknownEnumValue,
1138 #[snafu(implicit)]
1139 location: Location,
1140 },
1141
1142 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1143 InvalidColumnOption {
1144 column_name: String,
1145 msg: String,
1146 #[snafu(implicit)]
1147 location: Location,
1148 },
1149
1150 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1151 SetFulltextOptions {
1152 column_name: String,
1153 source: datatypes::Error,
1154 #[snafu(implicit)]
1155 location: Location,
1156 },
1157
1158 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1159 GetFulltextOptions {
1160 column_name: String,
1161 source: datatypes::Error,
1162 #[snafu(implicit)]
1163 location: Location,
1164 },
1165
1166 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1167 SetSkippingIndexOptions {
1168 column_name: String,
1169 source: datatypes::Error,
1170 #[snafu(implicit)]
1171 location: Location,
1172 },
1173
1174 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1175 UnsetSkippingIndexOptions {
1176 column_name: String,
1177 source: datatypes::Error,
1178 #[snafu(implicit)]
1179 location: Location,
1180 },
1181
1182 #[snafu(display("Failed to decode arrow ipc record batches"))]
1183 DecodeArrowIpc {
1184 #[snafu(source)]
1185 error: arrow::error::ArrowError,
1186 #[snafu(implicit)]
1187 location: Location,
1188 },
1189
1190 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1191 CastDefaultValue {
1192 reason: String,
1193 source: datatypes::Error,
1194 #[snafu(implicit)]
1195 location: Location,
1196 },
1197
1198 #[snafu(display("Unexpected: {}", reason))]
1199 Unexpected {
1200 reason: String,
1201 #[snafu(implicit)]
1202 location: Location,
1203 },
1204
1205 #[snafu(display("Failed to encode/decode flight message"))]
1206 FlightCodec {
1207 source: common_grpc::Error,
1208 #[snafu(implicit)]
1209 location: Location,
1210 },
1211
1212 #[snafu(display("Invalid index option"))]
1213 InvalidIndexOption {
1214 #[snafu(implicit)]
1215 location: Location,
1216 #[snafu(source)]
1217 error: datatypes::error::Error,
1218 },
1219
1220 #[snafu(display("Sql common error"))]
1221 SqlCommon {
1222 source: common_sql::error::Error,
1223 #[snafu(implicit)]
1224 location: Location,
1225 },
1226}
1227
1228impl ErrorExt for MetadataError {
1229 fn status_code(&self) -> StatusCode {
1230 match self {
1231 Self::SqlCommon { source, .. } => source.status_code(),
1232 _ => StatusCode::InvalidArguments,
1233 }
1234 }
1235
1236 fn as_any(&self) -> &dyn Any {
1237 self
1238 }
1239}
1240
1241fn set_column_fulltext_options(
1250 column_meta: &mut ColumnMetadata,
1251 column_name: &str,
1252 options: &FulltextOptions,
1253 current_options: Option<FulltextOptions>,
1254) -> Result<()> {
1255 if let Some(current_options) = current_options {
1256 ensure!(
1257 current_options.analyzer == options.analyzer
1258 && current_options.case_sensitive == options.case_sensitive,
1259 InvalidColumnOptionSnafu {
1260 column_name,
1261 msg: format!(
1262 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1263 current_options.analyzer, current_options.case_sensitive
1264 ),
1265 }
1266 );
1267 }
1268
1269 column_meta
1270 .column_schema
1271 .set_fulltext_options(options)
1272 .context(SetFulltextOptionsSnafu { column_name })?;
1273
1274 Ok(())
1275}
1276
1277fn unset_column_fulltext_options(
1278 column_meta: &mut ColumnMetadata,
1279 column_name: &str,
1280 current_options: Option<FulltextOptions>,
1281) -> Result<()> {
1282 if let Some(mut current_options) = current_options
1283 && current_options.enable
1284 {
1285 current_options.enable = false;
1286 column_meta
1287 .column_schema
1288 .set_fulltext_options(¤t_options)
1289 .context(SetFulltextOptionsSnafu { column_name })?;
1290 } else {
1291 return InvalidColumnOptionSnafu {
1292 column_name,
1293 msg: "FULLTEXT index already disabled",
1294 }
1295 .fail();
1296 }
1297
1298 Ok(())
1299}
1300
1301#[cfg(test)]
1302mod test {
1303 use datatypes::prelude::ConcreteDataType;
1304 use datatypes::schema::{
1305 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1306 };
1307 use datatypes::value::Value;
1308
1309 use super::*;
1310
1311 fn create_builder() -> RegionMetadataBuilder {
1312 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1313 }
1314
1315 fn build_test_region_metadata() -> RegionMetadata {
1316 let mut builder = create_builder();
1317 builder
1318 .push_column_metadata(ColumnMetadata {
1319 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1320 semantic_type: SemanticType::Tag,
1321 column_id: 1,
1322 })
1323 .push_column_metadata(ColumnMetadata {
1324 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1325 semantic_type: SemanticType::Field,
1326 column_id: 2,
1327 })
1328 .push_column_metadata(ColumnMetadata {
1329 column_schema: ColumnSchema::new(
1330 "c",
1331 ConcreteDataType::timestamp_millisecond_datatype(),
1332 false,
1333 ),
1334 semantic_type: SemanticType::Timestamp,
1335 column_id: 3,
1336 })
1337 .primary_key(vec![1])
1338 .partition_expr_json(Some("".to_string()));
1339 builder.build().unwrap()
1340 }
1341
1342 #[test]
1343 fn test_region_metadata() {
1344 let region_metadata = build_test_region_metadata();
1345 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1346 assert_eq!(
1347 "a",
1348 region_metadata.column_by_id(1).unwrap().column_schema.name
1349 );
1350 assert_eq!(None, region_metadata.column_by_id(10));
1351 }
1352
1353 #[test]
1354 fn test_region_metadata_serde() {
1355 let region_metadata = build_test_region_metadata();
1356 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1357 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1358 assert_eq!(region_metadata, deserialized);
1359 }
1360
1361 #[test]
1362 fn test_column_metadata_validate() {
1363 let mut builder = create_builder();
1364 let col = ColumnMetadata {
1365 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1366 semantic_type: SemanticType::Timestamp,
1367 column_id: 1,
1368 };
1369
1370 builder.push_column_metadata(col);
1371 let err = builder.build().unwrap_err();
1372 assert!(
1373 err.to_string()
1374 .contains("column `ts` is not timestamp type"),
1375 "unexpected err: {err}",
1376 );
1377 }
1378
1379 #[test]
1380 fn test_empty_region_metadata() {
1381 let builder = create_builder();
1382 let err = builder.build().unwrap_err();
1383 assert!(
1385 err.to_string().contains("time index not found"),
1386 "unexpected err: {err}",
1387 );
1388 }
1389
1390 #[test]
1391 fn test_same_column_id() {
1392 let mut builder = create_builder();
1393 builder
1394 .push_column_metadata(ColumnMetadata {
1395 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1396 semantic_type: SemanticType::Tag,
1397 column_id: 1,
1398 })
1399 .push_column_metadata(ColumnMetadata {
1400 column_schema: ColumnSchema::new(
1401 "b",
1402 ConcreteDataType::timestamp_millisecond_datatype(),
1403 false,
1404 ),
1405 semantic_type: SemanticType::Timestamp,
1406 column_id: 1,
1407 });
1408 let err = builder.build().unwrap_err();
1409 assert!(
1410 err.to_string()
1411 .contains("column a and b have the same column id"),
1412 "unexpected err: {err}",
1413 );
1414 }
1415
1416 #[test]
1417 fn test_duplicate_time_index() {
1418 let mut builder = create_builder();
1419 builder
1420 .push_column_metadata(ColumnMetadata {
1421 column_schema: ColumnSchema::new(
1422 "a",
1423 ConcreteDataType::timestamp_millisecond_datatype(),
1424 false,
1425 ),
1426 semantic_type: SemanticType::Timestamp,
1427 column_id: 1,
1428 })
1429 .push_column_metadata(ColumnMetadata {
1430 column_schema: ColumnSchema::new(
1431 "b",
1432 ConcreteDataType::timestamp_millisecond_datatype(),
1433 false,
1434 ),
1435 semantic_type: SemanticType::Timestamp,
1436 column_id: 2,
1437 });
1438 let err = builder.build().unwrap_err();
1439 assert!(
1440 err.to_string().contains("expect only one time index"),
1441 "unexpected err: {err}",
1442 );
1443 }
1444
1445 #[test]
1446 fn test_unknown_primary_key() {
1447 let mut builder = create_builder();
1448 builder
1449 .push_column_metadata(ColumnMetadata {
1450 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1451 semantic_type: SemanticType::Tag,
1452 column_id: 1,
1453 })
1454 .push_column_metadata(ColumnMetadata {
1455 column_schema: ColumnSchema::new(
1456 "b",
1457 ConcreteDataType::timestamp_millisecond_datatype(),
1458 false,
1459 ),
1460 semantic_type: SemanticType::Timestamp,
1461 column_id: 2,
1462 })
1463 .primary_key(vec![3]);
1464 let err = builder.build().unwrap_err();
1465 assert!(
1466 err.to_string().contains("unknown column id 3"),
1467 "unexpected err: {err}",
1468 );
1469 }
1470
1471 #[test]
1472 fn test_same_primary_key() {
1473 let mut builder = create_builder();
1474 builder
1475 .push_column_metadata(ColumnMetadata {
1476 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1477 semantic_type: SemanticType::Tag,
1478 column_id: 1,
1479 })
1480 .push_column_metadata(ColumnMetadata {
1481 column_schema: ColumnSchema::new(
1482 "b",
1483 ConcreteDataType::timestamp_millisecond_datatype(),
1484 false,
1485 ),
1486 semantic_type: SemanticType::Timestamp,
1487 column_id: 2,
1488 })
1489 .primary_key(vec![1, 1]);
1490 let err = builder.build().unwrap_err();
1491 assert!(
1492 err.to_string()
1493 .contains("duplicate column a in primary key"),
1494 "unexpected err: {err}",
1495 );
1496 }
1497
1498 #[test]
1499 fn test_in_time_index() {
1500 let mut builder = create_builder();
1501 builder
1502 .push_column_metadata(ColumnMetadata {
1503 column_schema: ColumnSchema::new(
1504 "ts",
1505 ConcreteDataType::timestamp_millisecond_datatype(),
1506 false,
1507 ),
1508 semantic_type: SemanticType::Timestamp,
1509 column_id: 1,
1510 })
1511 .primary_key(vec![1]);
1512 let err = builder.build().unwrap_err();
1513 assert!(
1514 err.to_string()
1515 .contains("column ts is already a time index column"),
1516 "unexpected err: {err}",
1517 );
1518 }
1519
1520 #[test]
1521 fn test_nullable_time_index() {
1522 let mut builder = create_builder();
1523 builder.push_column_metadata(ColumnMetadata {
1524 column_schema: ColumnSchema::new(
1525 "ts",
1526 ConcreteDataType::timestamp_millisecond_datatype(),
1527 true,
1528 ),
1529 semantic_type: SemanticType::Timestamp,
1530 column_id: 1,
1531 });
1532 let err = builder.build().unwrap_err();
1533 assert!(
1534 err.to_string()
1535 .contains("time index column ts must be NOT NULL"),
1536 "unexpected err: {err}",
1537 );
1538 }
1539
1540 #[test]
1541 fn test_primary_key_semantic_type() {
1542 let mut builder = create_builder();
1543 builder
1544 .push_column_metadata(ColumnMetadata {
1545 column_schema: ColumnSchema::new(
1546 "ts",
1547 ConcreteDataType::timestamp_millisecond_datatype(),
1548 false,
1549 ),
1550 semantic_type: SemanticType::Timestamp,
1551 column_id: 1,
1552 })
1553 .push_column_metadata(ColumnMetadata {
1554 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1555 semantic_type: SemanticType::Field,
1556 column_id: 2,
1557 })
1558 .primary_key(vec![2]);
1559 let err = builder.build().unwrap_err();
1560 assert!(
1561 err.to_string()
1562 .contains("semantic type of column a should be Tag, not Field"),
1563 "unexpected err: {err}",
1564 );
1565 }
1566
1567 #[test]
1568 fn test_primary_key_tag_num() {
1569 let mut builder = create_builder();
1570 builder
1571 .push_column_metadata(ColumnMetadata {
1572 column_schema: ColumnSchema::new(
1573 "ts",
1574 ConcreteDataType::timestamp_millisecond_datatype(),
1575 false,
1576 ),
1577 semantic_type: SemanticType::Timestamp,
1578 column_id: 1,
1579 })
1580 .push_column_metadata(ColumnMetadata {
1581 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1582 semantic_type: SemanticType::Tag,
1583 column_id: 2,
1584 })
1585 .push_column_metadata(ColumnMetadata {
1586 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1587 semantic_type: SemanticType::Tag,
1588 column_id: 3,
1589 })
1590 .primary_key(vec![2]);
1591 let err = builder.build().unwrap_err();
1592 assert!(
1593 err.to_string()
1594 .contains("number of primary key columns 1 not equal to tag columns 2"),
1595 "unexpected err: {err}",
1596 );
1597 }
1598
1599 #[test]
1600 fn test_bump_version() {
1601 let mut region_metadata = build_test_region_metadata();
1602 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1603 builder.bump_version();
1604 let new_meta = builder.build().unwrap();
1605 region_metadata.schema_version += 1;
1606 assert_eq!(region_metadata, new_meta);
1607 }
1608
1609 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1610 let semantic_type = if is_tag {
1611 SemanticType::Tag
1612 } else {
1613 SemanticType::Field
1614 };
1615 ColumnMetadata {
1616 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1617 semantic_type,
1618 column_id,
1619 }
1620 }
1621
1622 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1623 let actual: Vec<_> = metadata
1624 .column_metadatas
1625 .iter()
1626 .map(|col| &col.column_schema.name)
1627 .collect();
1628 assert_eq!(names, actual);
1629 }
1630
1631 fn get_columns_default_constraint(
1632 metadata: &RegionMetadata,
1633 name: String,
1634 ) -> Option<Option<&ColumnDefaultConstraint>> {
1635 metadata.column_metadatas.iter().find_map(|col| {
1636 if col.column_schema.name == name {
1637 Some(col.column_schema.default_constraint())
1638 } else {
1639 None
1640 }
1641 })
1642 }
1643
1644 #[test]
1645 fn test_alter() {
1646 let metadata = build_test_region_metadata();
1648 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1649 builder
1651 .alter(AlterKind::AddColumns {
1652 columns: vec![AddColumn {
1653 column_metadata: new_column_metadata("d", true, 4),
1654 location: None,
1655 }],
1656 })
1657 .unwrap();
1658 let metadata = builder.build().unwrap();
1659 check_columns(&metadata, &["a", "b", "c", "d"]);
1660 assert_eq!([1, 4], &metadata.primary_key[..]);
1661
1662 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1663 builder
1664 .alter(AlterKind::AddColumns {
1665 columns: vec![AddColumn {
1666 column_metadata: new_column_metadata("e", false, 5),
1667 location: Some(AddColumnLocation::First),
1668 }],
1669 })
1670 .unwrap();
1671 let metadata = builder.build().unwrap();
1672 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1673
1674 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1675 builder
1676 .alter(AlterKind::AddColumns {
1677 columns: vec![AddColumn {
1678 column_metadata: new_column_metadata("f", false, 6),
1679 location: Some(AddColumnLocation::After {
1680 column_name: "b".to_string(),
1681 }),
1682 }],
1683 })
1684 .unwrap();
1685 let metadata = builder.build().unwrap();
1686 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1687
1688 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1689 builder
1690 .alter(AlterKind::AddColumns {
1691 columns: vec![AddColumn {
1692 column_metadata: new_column_metadata("g", false, 7),
1693 location: Some(AddColumnLocation::After {
1694 column_name: "d".to_string(),
1695 }),
1696 }],
1697 })
1698 .unwrap();
1699 let metadata = builder.build().unwrap();
1700 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1701
1702 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1703 builder
1704 .alter(AlterKind::DropColumns {
1705 names: vec!["g".to_string(), "e".to_string()],
1706 })
1707 .unwrap();
1708 let metadata = builder.build().unwrap();
1709 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1710
1711 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1712 builder
1713 .alter(AlterKind::DropColumns {
1714 names: vec!["a".to_string()],
1715 })
1716 .unwrap();
1717 let err = builder.build().unwrap_err();
1719 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1720
1721 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1722 let mut column_metadata = new_column_metadata("g", false, 8);
1723 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1724 column_metadata.column_schema = column_metadata
1725 .column_schema
1726 .with_default_constraint(default_constraint.clone())
1727 .unwrap();
1728 builder
1729 .alter(AlterKind::AddColumns {
1730 columns: vec![AddColumn {
1731 column_metadata,
1732 location: None,
1733 }],
1734 })
1735 .unwrap();
1736 let metadata = builder.build().unwrap();
1737 assert_eq!(
1738 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1739 default_constraint.as_ref()
1740 );
1741 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1742
1743 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1744 builder
1745 .alter(AlterKind::DropDefaults {
1746 names: vec!["g".to_string()],
1747 })
1748 .unwrap();
1749 let metadata = builder.build().unwrap();
1750 assert_eq!(
1751 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1752 None
1753 );
1754 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1755
1756 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1757 builder
1758 .alter(AlterKind::DropColumns {
1759 names: vec!["g".to_string()],
1760 })
1761 .unwrap();
1762 let metadata = builder.build().unwrap();
1763 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1764
1765 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1766 builder
1767 .alter(AlterKind::ModifyColumnTypes {
1768 columns: vec![ModifyColumnType {
1769 column_name: "b".to_string(),
1770 target_type: ConcreteDataType::string_datatype(),
1771 }],
1772 })
1773 .unwrap();
1774 let metadata = builder.build().unwrap();
1775 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1776 let b_type = &metadata
1777 .column_by_name("b")
1778 .unwrap()
1779 .column_schema
1780 .data_type;
1781 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1782
1783 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1784 builder
1785 .alter(AlterKind::SetIndexes {
1786 options: vec![SetIndexOption::Fulltext {
1787 column_name: "b".to_string(),
1788 options: FulltextOptions::new_unchecked(
1789 true,
1790 FulltextAnalyzer::Chinese,
1791 true,
1792 FulltextBackend::Bloom,
1793 1000,
1794 0.01,
1795 ),
1796 }],
1797 })
1798 .unwrap();
1799 let metadata = builder.build().unwrap();
1800 let a_fulltext_options = metadata
1801 .column_by_name("b")
1802 .unwrap()
1803 .column_schema
1804 .fulltext_options()
1805 .unwrap()
1806 .unwrap();
1807 assert!(a_fulltext_options.enable);
1808 assert_eq!(
1809 datatypes::schema::FulltextAnalyzer::Chinese,
1810 a_fulltext_options.analyzer
1811 );
1812 assert!(a_fulltext_options.case_sensitive);
1813
1814 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1815 builder
1816 .alter(AlterKind::UnsetIndexes {
1817 options: vec![UnsetIndexOption::Fulltext {
1818 column_name: "b".to_string(),
1819 }],
1820 })
1821 .unwrap();
1822 let metadata = builder.build().unwrap();
1823 let a_fulltext_options = metadata
1824 .column_by_name("b")
1825 .unwrap()
1826 .column_schema
1827 .fulltext_options()
1828 .unwrap()
1829 .unwrap();
1830 assert!(!a_fulltext_options.enable);
1831 assert_eq!(
1832 datatypes::schema::FulltextAnalyzer::Chinese,
1833 a_fulltext_options.analyzer
1834 );
1835 assert!(a_fulltext_options.case_sensitive);
1836 }
1837
1838 #[test]
1839 fn test_add_if_not_exists() {
1840 let metadata = build_test_region_metadata();
1842 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1843 builder
1845 .alter(AlterKind::AddColumns {
1846 columns: vec![
1847 AddColumn {
1848 column_metadata: new_column_metadata("d", true, 4),
1849 location: None,
1850 },
1851 AddColumn {
1852 column_metadata: new_column_metadata("d", true, 4),
1853 location: None,
1854 },
1855 ],
1856 })
1857 .unwrap();
1858 let metadata = builder.build().unwrap();
1859 check_columns(&metadata, &["a", "b", "c", "d"]);
1860 assert_eq!([1, 4], &metadata.primary_key[..]);
1861
1862 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1863 builder
1865 .alter(AlterKind::AddColumns {
1866 columns: vec![AddColumn {
1867 column_metadata: new_column_metadata("b", false, 2),
1868 location: None,
1869 }],
1870 })
1871 .unwrap();
1872 let metadata = builder.build().unwrap();
1873 check_columns(&metadata, &["a", "b", "c", "d"]);
1874 }
1875
1876 #[test]
1877 fn test_add_column_with_inverted_index() {
1878 let metadata = build_test_region_metadata();
1882 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1883 let mut col = new_column_metadata("d", true, 4);
1885 col.column_schema.set_inverted_index(true);
1886 builder
1887 .alter(AlterKind::AddColumns {
1888 columns: vec![
1889 AddColumn {
1890 column_metadata: col,
1891 location: None,
1892 },
1893 AddColumn {
1894 column_metadata: new_column_metadata("e", true, 5),
1895 location: None,
1896 },
1897 ],
1898 })
1899 .unwrap();
1900 let metadata = builder.build().unwrap();
1901 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1902 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1903 let column_metadata = metadata.column_by_name("a").unwrap();
1904 assert!(!column_metadata.column_schema.is_inverted_indexed());
1905 let column_metadata = metadata.column_by_name("b").unwrap();
1906 assert!(!column_metadata.column_schema.is_inverted_indexed());
1907 let column_metadata = metadata.column_by_name("c").unwrap();
1908 assert!(!column_metadata.column_schema.is_inverted_indexed());
1909 let column_metadata = metadata.column_by_name("d").unwrap();
1910 assert!(column_metadata.column_schema.is_inverted_indexed());
1911 let column_metadata = metadata.column_by_name("e").unwrap();
1912 assert!(!column_metadata.column_schema.is_inverted_indexed());
1913 }
1914
1915 #[test]
1916 fn test_drop_if_exists() {
1917 let metadata = build_test_region_metadata();
1919 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1920 builder
1922 .alter(AlterKind::AddColumns {
1923 columns: vec![
1924 AddColumn {
1925 column_metadata: new_column_metadata("d", false, 4),
1926 location: None,
1927 },
1928 AddColumn {
1929 column_metadata: new_column_metadata("e", false, 5),
1930 location: None,
1931 },
1932 ],
1933 })
1934 .unwrap();
1935 let metadata = builder.build().unwrap();
1936 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1937
1938 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1939 builder
1940 .alter(AlterKind::DropColumns {
1941 names: vec!["b".to_string(), "b".to_string()],
1942 })
1943 .unwrap();
1944 let metadata = builder.build().unwrap();
1945 check_columns(&metadata, &["a", "c", "d", "e"]);
1946
1947 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1948 builder
1949 .alter(AlterKind::DropColumns {
1950 names: vec!["b".to_string(), "e".to_string()],
1951 })
1952 .unwrap();
1953 let metadata = builder.build().unwrap();
1954 check_columns(&metadata, &["a", "c", "d"]);
1955 }
1956
1957 #[test]
1958 fn test_invalid_column_name() {
1959 let mut builder = create_builder();
1960 builder.push_column_metadata(ColumnMetadata {
1961 column_schema: ColumnSchema::new(
1962 "__sequence",
1963 ConcreteDataType::timestamp_millisecond_datatype(),
1964 false,
1965 ),
1966 semantic_type: SemanticType::Timestamp,
1967 column_id: 1,
1968 });
1969 let err = builder.build().unwrap_err();
1970 assert!(
1971 err.to_string()
1972 .contains("internal column name that can not be used"),
1973 "unexpected err: {err}",
1974 );
1975 }
1976
1977 #[test]
1978 fn test_allow_internal_column_name() {
1979 let mut builder = create_builder();
1980 builder
1981 .push_column_metadata(ColumnMetadata {
1982 column_schema: ColumnSchema::new(
1983 "__primary_key",
1984 ConcreteDataType::string_datatype(),
1985 false,
1986 ),
1987 semantic_type: SemanticType::Tag,
1988 column_id: 1,
1989 })
1990 .push_column_metadata(ColumnMetadata {
1991 column_schema: ColumnSchema::new(
1992 "ts",
1993 ConcreteDataType::timestamp_millisecond_datatype(),
1994 false,
1995 ),
1996 semantic_type: SemanticType::Timestamp,
1997 column_id: 2,
1998 })
1999 .primary_key(vec![1]);
2000
2001 let metadata = builder.build_without_validation().unwrap();
2002 assert_eq!(
2003 "__primary_key",
2004 metadata.column_metadatas[0].column_schema.name
2005 );
2006 }
2007
2008 #[test]
2009 fn test_build_without_validation() {
2010 let mut builder = create_builder();
2012 builder
2013 .push_column_metadata(ColumnMetadata {
2014 column_schema: ColumnSchema::new(
2015 "ts",
2016 ConcreteDataType::timestamp_millisecond_datatype(),
2017 false,
2018 ),
2019 semantic_type: SemanticType::Timestamp,
2020 column_id: 1,
2021 })
2022 .push_column_metadata(ColumnMetadata {
2023 column_schema: ColumnSchema::new(
2024 "field",
2025 ConcreteDataType::string_datatype(),
2026 true,
2027 ),
2028 semantic_type: SemanticType::Field,
2029 column_id: 2,
2030 })
2031 .primary_key(vec![2]);
2032
2033 let metadata = builder.build_without_validation().unwrap();
2035 assert_eq!(vec![2], metadata.primary_key);
2036
2037 let mut builder = create_builder();
2039 builder
2040 .push_column_metadata(ColumnMetadata {
2041 column_schema: ColumnSchema::new(
2042 "ts",
2043 ConcreteDataType::timestamp_millisecond_datatype(),
2044 false,
2045 ),
2046 semantic_type: SemanticType::Timestamp,
2047 column_id: 1,
2048 })
2049 .push_column_metadata(ColumnMetadata {
2050 column_schema: ColumnSchema::new(
2051 "field",
2052 ConcreteDataType::string_datatype(),
2053 true,
2054 ),
2055 semantic_type: SemanticType::Field,
2056 column_id: 2,
2057 })
2058 .primary_key(vec![2]);
2059 let err = builder.build().unwrap_err();
2060 assert!(
2061 err.to_string()
2062 .contains("semantic type of column field should be Tag"),
2063 "unexpected err: {err}"
2064 );
2065 }
2066
2067 #[test]
2068 fn test_debug_for_column_metadata() {
2069 let region_metadata = build_test_region_metadata();
2070 let formatted = format!("{:?}", region_metadata);
2071 assert_eq!(
2072 formatted,
2073 "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2074 );
2075 }
2076
2077 #[test]
2078 fn test_region_metadata_deserialize_default_primary_key_encoding() {
2079 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2080 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2081 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2082
2083 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2084 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2085 assert_eq!(
2086 deserialized.primary_key_encoding,
2087 PrimaryKeyEncoding::Sparse
2088 );
2089 }
2090}