1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use std::{fmt, mem};
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_base::hash::partition_expr_version;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_macro::stack_trace_debug;
31use datatypes::arrow;
32use datatypes::arrow::datatypes::FieldRef;
33use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
34use datatypes::types::TimestampType;
35use itertools::Itertools;
36use serde::de::Error;
37use serde::{Deserialize, Deserializer, Serialize};
38use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
39
40use crate::codec::PrimaryKeyEncoding;
41use crate::region_request::{
42 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
43};
44use crate::storage::consts::is_internal_column;
45use crate::storage::{ColumnId, RegionId};
46
47pub type Result<T> = std::result::Result<T, MetadataError>;
48
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
51pub struct ColumnMetadata {
52 pub column_schema: ColumnSchema,
54 pub semantic_type: SemanticType,
56 pub column_id: ColumnId,
58}
59
60impl fmt::Debug for ColumnMetadata {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(
63 f,
64 "[{:?} {:?} {:?}]",
65 self.column_schema, self.semantic_type, self.column_id,
66 )
67 }
68}
69
70impl ColumnMetadata {
71 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
73 let column_id = column_def.column_id;
74 let column_def = column_def
75 .column_def
76 .context(InvalidRawRegionRequestSnafu {
77 err: "column_def is absent",
78 })?;
79 let semantic_type = column_def.semantic_type();
80 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
81
82 Ok(Self {
83 column_schema,
84 semantic_type,
85 column_id,
86 })
87 }
88
89 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
91 serde_json::to_vec(columns)
92 }
93
94 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
96 serde_json::from_slice(bytes)
97 }
98
99 pub fn is_same_datatype(&self, other: &Self) -> bool {
100 self.column_schema.data_type == other.column_schema.data_type
101 }
102
103 pub fn estimated_size(&self) -> usize {
105 mem::size_of_val(self) - mem::size_of_val(&self.column_schema)
106 + self.column_schema.estimated_size()
107 }
108}
109
110#[cfg_attr(doc, aquamarine::aquamarine)]
111#[derive(Clone, PartialEq, Eq, Serialize)]
135pub struct RegionMetadata {
136 #[serde(skip)]
138 pub schema: SchemaRef,
139
140 #[serde(skip)]
144 time_index: ColumnId,
145 #[serde(skip)]
147 id_to_index: HashMap<ColumnId, usize>,
148
149 pub column_metadatas: Vec<ColumnMetadata>,
152 pub primary_key: Vec<ColumnId>,
154
155 pub region_id: RegionId,
157 pub schema_version: u64,
161
162 pub primary_key_encoding: PrimaryKeyEncoding,
164
165 pub partition_expr: Option<String>,
170 #[serde(skip)]
171 pub partition_expr_version: u64,
172}
173
174impl fmt::Debug for RegionMetadata {
175 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
176 f.debug_struct("RegionMetadata")
177 .field("column_metadatas", &self.column_metadatas)
178 .field("time_index", &self.time_index)
179 .field("primary_key", &self.primary_key)
180 .field("region_id", &self.region_id)
181 .field("schema_version", &self.schema_version)
182 .field("partition_expr", &self.partition_expr)
183 .finish()
184 }
185}
186
187pub type RegionMetadataRef = Arc<RegionMetadata>;
188
189impl<'de> Deserialize<'de> for RegionMetadata {
190 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
191 where
192 D: Deserializer<'de>,
193 {
194 #[derive(Deserialize)]
196 struct RegionMetadataWithoutSchema {
197 column_metadatas: Vec<ColumnMetadata>,
198 primary_key: Vec<ColumnId>,
199 region_id: RegionId,
200 schema_version: u64,
201 #[serde(default)]
202 primary_key_encoding: PrimaryKeyEncoding,
203 #[serde(default)]
204 partition_expr: Option<String>,
205 }
206
207 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
208 let skipped =
209 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
210
211 let partition_expr_version =
212 partition_expr_version(without_schema.partition_expr.as_deref());
213
214 Ok(Self {
215 schema: skipped.schema,
216 time_index: skipped.time_index,
217 id_to_index: skipped.id_to_index,
218 column_metadatas: without_schema.column_metadatas,
219 primary_key: without_schema.primary_key,
220 region_id: without_schema.region_id,
221 schema_version: without_schema.schema_version,
222 primary_key_encoding: without_schema.primary_key_encoding,
223 partition_expr: without_schema.partition_expr,
224 partition_expr_version,
225 })
226 }
227}
228
229impl RegionMetadata {
230 pub fn from_json(s: &str) -> Result<Self> {
232 serde_json::from_str(s).context(SerdeJsonSnafu)
233 }
234
235 pub fn estimated_size(&self) -> usize {
237 mem::size_of_val(self)
238 + mem::size_of::<ColumnMetadata>() * self.column_metadatas.capacity()
239 + self
240 .column_metadatas
241 .iter()
242 .map(|column| column.estimated_size() - mem::size_of::<ColumnMetadata>())
243 .sum::<usize>()
244 + mem::size_of::<ColumnId>() * self.primary_key.capacity()
245 + mem::size_of::<(ColumnId, usize)>() * self.id_to_index.capacity()
246 + self.schema.estimated_size()
247 + self
248 .partition_expr
249 .as_ref()
250 .map(|expr| expr.capacity())
251 .unwrap_or_default()
252 }
253
254 pub fn to_json(&self) -> Result<String> {
256 serde_json::to_string(&self).context(SerdeJsonSnafu)
257 }
258
259 pub fn set_partition_expr(&mut self, expr: Option<String>) {
260 self.partition_expr_version = partition_expr_version(expr.as_deref());
261 self.partition_expr = expr;
262 }
263
264 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
266 self.id_to_index
267 .get(&column_id)
268 .map(|index| &self.column_metadatas[*index])
269 }
270
271 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
273 self.id_to_index.get(&column_id).copied()
274 }
275
276 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
278 self.column_metadatas
279 .iter()
280 .position(|col| col.column_schema.name == column_name)
281 }
282
283 pub fn time_index_column(&self) -> &ColumnMetadata {
288 let index = self.id_to_index[&self.time_index];
289 &self.column_metadatas[index]
290 }
291
292 pub fn time_index_type(&self) -> TimestampType {
297 let index = self.id_to_index[&self.time_index];
298 self.column_metadatas[index]
299 .column_schema
300 .data_type
301 .as_timestamp()
302 .unwrap()
303 }
304
305 pub fn time_index_column_pos(&self) -> usize {
307 self.id_to_index[&self.time_index]
308 }
309
310 pub fn time_index_field(&self) -> FieldRef {
312 let index = self.id_to_index[&self.time_index];
313 self.schema.arrow_schema().fields[index].clone()
314 }
315
316 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
318 self.schema
319 .column_index_by_name(name)
320 .map(|index| &self.column_metadatas[index])
321 }
322
323 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
325 self.primary_key
327 .iter()
328 .map(|id| self.column_by_id(*id).unwrap())
329 }
330
331 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
336 self.column_metadatas
337 .iter()
338 .filter(|column| column.semantic_type == SemanticType::Field)
339 }
340
341 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
345 self.primary_key.iter().position(|id| *id == column_id)
346 }
347
348 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
352 ensure!(
354 projection.contains(&self.time_index),
355 TimeIndexNotFoundSnafu
356 );
357
358 let indices_to_preserve = projection
360 .iter()
361 .map(|id| {
362 self.column_index_by_id(*id)
363 .with_context(|| InvalidRegionRequestSnafu {
364 region_id: self.region_id,
365 err: format!("column id {} not found", id),
366 })
367 })
368 .collect::<Result<Vec<_>>>()?;
369
370 let projected_schema =
372 self.schema
373 .try_project(&indices_to_preserve)
374 .with_context(|_| SchemaProjectSnafu {
375 origin_schema: self.schema.clone(),
376 projection: projection.to_vec(),
377 })?;
378
379 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
381 let mut projected_primary_key = vec![];
382 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
383 for index in indices_to_preserve {
384 let col = self.column_metadatas[index].clone();
385 if col.semantic_type == SemanticType::Tag {
386 projected_primary_key.push(col.column_id);
387 }
388 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
389 projected_column_metadatas.push(col);
390 }
391
392 Ok(RegionMetadata {
393 schema: Arc::new(projected_schema),
394 time_index: self.time_index,
395 id_to_index: projected_id_to_index,
396 column_metadatas: projected_column_metadatas,
397 primary_key: projected_primary_key,
398 region_id: self.region_id,
399 schema_version: self.schema_version,
400 primary_key_encoding: self.primary_key_encoding,
401 partition_expr: self.partition_expr.clone(),
402 partition_expr_version: partition_expr_version(self.partition_expr.as_deref()),
403 })
404 }
405
406 pub fn inverted_indexed_column_ids<'a>(
408 &self,
409 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
410 ) -> HashSet<ColumnId> {
411 let mut inverted_index = self
412 .column_metadatas
413 .iter()
414 .filter(|column| column.column_schema.is_inverted_indexed())
415 .map(|column| column.column_id)
416 .collect::<HashSet<_>>();
417
418 for ignored in ignore_column_ids {
419 inverted_index.remove(ignored);
420 }
421
422 inverted_index
423 }
424
425 pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
428 self.column_metadatas
429 .iter()
430 .filter_map(|column| {
431 column
432 .column_schema
433 .vector_index_options()
434 .ok()
435 .flatten()
436 .map(|options| (column.column_id, options))
437 })
438 .collect()
439 }
440
441 fn validate(&self) -> Result<()> {
443 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
445 for col in &self.column_metadatas {
446 Self::validate_column_metadata(col)?;
448
449 ensure!(
452 !id_names.contains_key(&col.column_id),
453 InvalidMetaSnafu {
454 reason: format!(
455 "column {} and {} have the same column id {}",
456 id_names[&col.column_id], col.column_schema.name, col.column_id,
457 ),
458 }
459 );
460 id_names.insert(col.column_id, &col.column_schema.name);
461 }
462
463 let time_indexes = self
465 .column_metadatas
466 .iter()
467 .filter(|col| col.semantic_type == SemanticType::Timestamp)
468 .collect::<Vec<_>>();
469 ensure!(
470 time_indexes.len() == 1,
471 InvalidMetaSnafu {
472 reason: format!(
473 "expect only one time index, found {}: {}",
474 time_indexes.len(),
475 time_indexes
476 .iter()
477 .map(|c| &c.column_schema.name)
478 .join(", ")
479 ),
480 }
481 );
482
483 ensure!(
485 !self.time_index_column().column_schema.is_nullable(),
486 InvalidMetaSnafu {
487 reason: format!(
488 "time index column {} must be NOT NULL",
489 self.time_index_column().column_schema.name
490 ),
491 }
492 );
493
494 if !self.primary_key.is_empty() {
495 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
496 for column_id in &self.primary_key {
498 ensure!(
500 id_names.contains_key(column_id),
501 InvalidMetaSnafu {
502 reason: format!("unknown column id {}", column_id),
503 }
504 );
505
506 let column = self.column_by_id(*column_id).unwrap();
508 ensure!(
510 !pk_ids.contains(&column_id),
511 InvalidMetaSnafu {
512 reason: format!(
513 "duplicate column {} in primary key",
514 column.column_schema.name
515 ),
516 }
517 );
518
519 ensure!(
521 *column_id != self.time_index,
522 InvalidMetaSnafu {
523 reason: format!(
524 "column {} is already a time index column",
525 column.column_schema.name,
526 ),
527 }
528 );
529
530 ensure!(
532 column.semantic_type == SemanticType::Tag,
533 InvalidMetaSnafu {
534 reason: format!(
535 "semantic type of column {} should be Tag, not {:?}",
536 column.column_schema.name, column.semantic_type
537 ),
538 }
539 );
540
541 pk_ids.insert(column_id);
542 }
543 }
544
545 let num_tag = self
547 .column_metadatas
548 .iter()
549 .filter(|col| col.semantic_type == SemanticType::Tag)
550 .count();
551 ensure!(
552 num_tag == self.primary_key.len(),
553 InvalidMetaSnafu {
554 reason: format!(
555 "number of primary key columns {} not equal to tag columns {}",
556 self.primary_key.len(),
557 num_tag
558 ),
559 }
560 );
561
562 Ok(())
563 }
564
565 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
567 if column_metadata.semantic_type == SemanticType::Timestamp {
568 ensure!(
569 column_metadata.column_schema.data_type.is_timestamp(),
570 InvalidMetaSnafu {
571 reason: format!(
572 "column `{}` is not timestamp type",
573 column_metadata.column_schema.name
574 ),
575 }
576 );
577 }
578
579 ensure!(
580 !is_internal_column(&column_metadata.column_schema.name),
581 InvalidMetaSnafu {
582 reason: format!(
583 "{} is internal column name that can not be used",
584 column_metadata.column_schema.name
585 ),
586 }
587 );
588
589 Ok(())
590 }
591}
592
593pub struct RegionMetadataBuilder {
595 region_id: RegionId,
596 column_metadatas: Vec<ColumnMetadata>,
597 primary_key: Vec<ColumnId>,
598 schema_version: u64,
599 primary_key_encoding: PrimaryKeyEncoding,
600 partition_expr: Option<String>,
601}
602
603impl RegionMetadataBuilder {
604 pub fn new(id: RegionId) -> Self {
606 Self {
607 region_id: id,
608 column_metadatas: vec![],
609 primary_key: vec![],
610 schema_version: 0,
611 primary_key_encoding: PrimaryKeyEncoding::Dense,
612 partition_expr: None,
613 }
614 }
615
616 pub fn from_existing(existing: RegionMetadata) -> Self {
618 Self {
619 column_metadatas: existing.column_metadatas,
620 primary_key: existing.primary_key,
621 region_id: existing.region_id,
622 schema_version: existing.schema_version,
623 primary_key_encoding: existing.primary_key_encoding,
624 partition_expr: existing.partition_expr,
625 }
626 }
627
628 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
630 self.primary_key_encoding = encoding;
631 self
632 }
633
634 pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
636 self.partition_expr = expr_json;
637 self
638 }
639
640 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
642 self.column_metadatas.push(column_metadata);
643 self
644 }
645
646 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
648 self.primary_key = key;
649 self
650 }
651
652 pub fn bump_version(&mut self) -> &mut Self {
654 self.schema_version += 1;
655 self
656 }
657
658 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
662 match kind {
663 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
664 AlterKind::DropColumns { names } => self.drop_columns(&names),
665 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
666 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
667 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
668 AlterKind::SetRegionOptions { options: _ } => {
669 }
671 AlterKind::UnsetRegionOptions { keys: _ } => {
672 }
674 AlterKind::DropDefaults { names } => {
675 self.drop_defaults(names)?;
676 }
677 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
678 AlterKind::SyncColumns { column_metadatas } => {
679 self.primary_key = column_metadatas
680 .iter()
681 .filter_map(|column_metadata| {
682 if column_metadata.semantic_type == SemanticType::Tag {
683 Some(column_metadata.column_id)
684 } else {
685 None
686 }
687 })
688 .collect::<Vec<_>>();
689 self.column_metadatas = column_metadatas;
690 }
691 }
692 Ok(self)
693 }
694
695 pub fn build(self) -> Result<RegionMetadata> {
697 self.build_with_options(true)
698 }
699
700 pub fn build_without_validation(self) -> Result<RegionMetadata> {
705 self.build_with_options(false)
706 }
707
708 fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
709 let skipped = SkippedFields::new(&self.column_metadatas)?;
710
711 let partition_expr_version = partition_expr_version(self.partition_expr.as_deref());
712 let meta = RegionMetadata {
713 schema: skipped.schema,
714 time_index: skipped.time_index,
715 id_to_index: skipped.id_to_index,
716 column_metadatas: self.column_metadatas,
717 primary_key: self.primary_key,
718 region_id: self.region_id,
719 schema_version: self.schema_version,
720 primary_key_encoding: self.primary_key_encoding,
721 partition_expr: self.partition_expr,
722 partition_expr_version,
723 };
724
725 if validate {
726 meta.validate()?;
727 }
728
729 Ok(meta)
730 }
731
732 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
734 let mut names: HashSet<_> = self
735 .column_metadatas
736 .iter()
737 .map(|col| col.column_schema.name.clone())
738 .collect();
739
740 for add_column in columns {
741 if names.contains(&add_column.column_metadata.column_schema.name) {
742 continue;
744 }
745
746 let column_id = add_column.column_metadata.column_id;
747 let semantic_type = add_column.column_metadata.semantic_type;
748 let column_name = add_column.column_metadata.column_schema.name.clone();
749 match add_column.location {
750 None => {
751 self.column_metadatas.push(add_column.column_metadata);
752 }
753 Some(AddColumnLocation::First) => {
754 self.column_metadatas.insert(0, add_column.column_metadata);
755 }
756 Some(AddColumnLocation::After { column_name }) => {
757 let pos = self
758 .column_metadatas
759 .iter()
760 .position(|col| col.column_schema.name == column_name)
761 .context(InvalidRegionRequestSnafu {
762 region_id: self.region_id,
763 err: format!(
764 "column {} not found, failed to add column {} after it",
765 column_name, add_column.column_metadata.column_schema.name
766 ),
767 })?;
768 self.column_metadatas
770 .insert(pos + 1, add_column.column_metadata);
771 }
772 }
773 names.insert(column_name);
774 if semantic_type == SemanticType::Tag {
775 self.primary_key.push(column_id);
777 }
778 }
779
780 Ok(())
781 }
782
783 fn drop_columns(&mut self, names: &[String]) {
785 let name_set: HashSet<_> = names.iter().collect();
786 self.column_metadatas
787 .retain(|col| !name_set.contains(&col.column_schema.name));
788 }
789
790 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
792 let mut change_type_map: HashMap<_, _> = columns
793 .into_iter()
794 .map(
795 |ModifyColumnType {
796 column_name,
797 target_type,
798 }| (column_name, target_type),
799 )
800 .collect();
801
802 for column_meta in self.column_metadatas.iter_mut() {
803 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
804 column_meta.column_schema.data_type = target_type.clone();
805 let new_default =
807 if let Some(default_value) = column_meta.column_schema.default_constraint() {
808 Some(
809 default_value
810 .cast_to_datatype(&target_type)
811 .with_context(|_| CastDefaultValueSnafu {
812 reason: format!(
813 "Failed to cast default value from {:?} to type {:?}",
814 default_value, target_type
815 ),
816 })?,
817 )
818 } else {
819 None
820 };
821 column_meta.column_schema = column_meta
822 .column_schema
823 .clone()
824 .with_default_constraint(new_default.clone())
825 .with_context(|_| CastDefaultValueSnafu {
826 reason: format!("Failed to set new default: {:?}", new_default),
827 })?;
828 }
829 }
830
831 Ok(())
832 }
833
834 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
835 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
836 for option in &options {
837 set_index_map
838 .entry(option.column_name())
839 .or_default()
840 .push(option);
841 }
842
843 for column_metadata in self.column_metadatas.iter_mut() {
844 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
845 for option in options {
846 Self::set_index(column_metadata, option)?;
847 }
848 }
849 }
850
851 Ok(())
852 }
853
854 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
855 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
856 for option in &options {
857 unset_index_map
858 .entry(option.column_name())
859 .or_default()
860 .push(option);
861 }
862
863 for column_metadata in self.column_metadatas.iter_mut() {
864 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
865 for option in options {
866 Self::unset_index(column_metadata, option)?;
867 }
868 }
869 }
870
871 Ok(())
872 }
873
874 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
875 match options {
876 SetIndexOption::Fulltext {
877 column_name,
878 options,
879 } => {
880 ensure!(
881 column_metadata.column_schema.data_type.is_string(),
882 InvalidColumnOptionSnafu {
883 column_name,
884 msg: "FULLTEXT index only supports string type".to_string(),
885 }
886 );
887 let current_fulltext_options = column_metadata
888 .column_schema
889 .fulltext_options()
890 .with_context(|_| GetFulltextOptionsSnafu {
891 column_name: column_name.clone(),
892 })?;
893 set_column_fulltext_options(
894 column_metadata,
895 column_name,
896 options,
897 current_fulltext_options,
898 )?;
899 }
900 SetIndexOption::Inverted { .. } => {
901 column_metadata.column_schema.set_inverted_index(true)
902 }
903 SetIndexOption::Skipping {
904 column_name,
905 options,
906 } => {
907 column_metadata
908 .column_schema
909 .set_skipping_options(options)
910 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
911 }
912 }
913
914 Ok(())
915 }
916
917 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
918 match options {
919 UnsetIndexOption::Fulltext { column_name } => {
920 ensure!(
921 column_metadata.column_schema.data_type.is_string(),
922 InvalidColumnOptionSnafu {
923 column_name,
924 msg: "FULLTEXT index only supports string type".to_string(),
925 }
926 );
927
928 let current_fulltext_options = column_metadata
929 .column_schema
930 .fulltext_options()
931 .with_context(|_| GetFulltextOptionsSnafu {
932 column_name: column_name.clone(),
933 })?;
934
935 unset_column_fulltext_options(
936 column_metadata,
937 column_name,
938 current_fulltext_options,
939 )?;
940 }
941 UnsetIndexOption::Inverted { .. } => {
942 column_metadata.column_schema.set_inverted_index(false)
943 }
944 UnsetIndexOption::Skipping { column_name } => {
945 column_metadata
946 .column_schema
947 .unset_skipping_options()
948 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
949 }
950 }
951
952 Ok(())
953 }
954
955 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
956 for name in column_names.iter() {
957 let meta = self
958 .column_metadatas
959 .iter_mut()
960 .find(|col| col.column_schema.name == *name);
961 if let Some(meta) = meta {
962 if !meta.column_schema.is_nullable() {
963 return InvalidRegionRequestSnafu {
964 region_id: self.region_id,
965 err: format!(
966 "column {name} is not nullable and `default` cannot be dropped",
967 ),
968 }
969 .fail();
970 }
971 meta.column_schema = meta
972 .column_schema
973 .clone()
974 .with_default_constraint(None)
975 .with_context(|_| CastDefaultValueSnafu {
976 reason: format!("Failed to drop default : {name:?}"),
977 })?;
978 } else {
979 return InvalidRegionRequestSnafu {
980 region_id: self.region_id,
981 err: format!("column {name} not found",),
982 }
983 .fail();
984 }
985 }
986 Ok(())
987 }
988
989 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
990 for set_default in set_defaults.iter() {
991 let meta = self
992 .column_metadatas
993 .iter_mut()
994 .find(|col| col.column_schema.name == set_default.name);
995 if let Some(meta) = meta {
996 let default_constraint = common_sql::convert::deserialize_default_constraint(
997 set_default.default_constraint.as_slice(),
998 &meta.column_schema.name,
999 &meta.column_schema.data_type,
1000 )
1001 .context(SqlCommonSnafu)?;
1002
1003 meta.column_schema = meta
1004 .column_schema
1005 .clone()
1006 .with_default_constraint(default_constraint)
1007 .with_context(|_| CastDefaultValueSnafu {
1008 reason: format!("Failed to set default : {set_default:?}"),
1009 })?;
1010 } else {
1011 return InvalidRegionRequestSnafu {
1012 region_id: self.region_id,
1013 err: format!("column {} not found", set_default.name),
1014 }
1015 .fail();
1016 }
1017 }
1018 Ok(())
1019 }
1020}
1021
1022struct SkippedFields {
1024 schema: SchemaRef,
1026 time_index: ColumnId,
1028 id_to_index: HashMap<ColumnId, usize>,
1030}
1031
1032impl SkippedFields {
1033 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
1035 let column_schemas = column_metadatas
1036 .iter()
1037 .map(|column_metadata| column_metadata.column_schema.clone())
1038 .collect();
1039 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1040 let time_index = column_metadatas
1041 .iter()
1042 .find_map(|col| {
1043 if col.semantic_type == SemanticType::Timestamp {
1044 Some(col.column_id)
1045 } else {
1046 None
1047 }
1048 })
1049 .context(InvalidMetaSnafu {
1050 reason: "time index not found",
1051 })?;
1052 let id_to_index = column_metadatas
1053 .iter()
1054 .enumerate()
1055 .map(|(idx, col)| (col.column_id, idx))
1056 .collect();
1057
1058 Ok(SkippedFields {
1059 schema,
1060 time_index,
1061 id_to_index,
1062 })
1063 }
1064}
1065
1066#[derive(Snafu)]
1067#[snafu(visibility(pub))]
1068#[stack_trace_debug]
1069pub enum MetadataError {
1070 #[snafu(display("Invalid schema"))]
1071 InvalidSchema {
1072 source: datatypes::error::Error,
1073 #[snafu(implicit)]
1074 location: Location,
1075 },
1076
1077 #[snafu(display("Invalid metadata, {}", reason))]
1078 InvalidMeta {
1079 reason: String,
1080 #[snafu(implicit)]
1081 location: Location,
1082 },
1083
1084 #[snafu(display("Failed to ser/de json object"))]
1085 SerdeJson {
1086 #[snafu(implicit)]
1087 location: Location,
1088 #[snafu(source)]
1089 error: serde_json::Error,
1090 },
1091
1092 #[snafu(display("Invalid raw region request, err: {}", err))]
1093 InvalidRawRegionRequest {
1094 err: String,
1095 #[snafu(implicit)]
1096 location: Location,
1097 },
1098
1099 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1100 InvalidRegionRequest {
1101 region_id: RegionId,
1102 err: String,
1103 #[snafu(implicit)]
1104 location: Location,
1105 },
1106
1107 #[snafu(display("Unexpected schema error during project"))]
1108 SchemaProject {
1109 origin_schema: SchemaRef,
1110 projection: Vec<ColumnId>,
1111 #[snafu(implicit)]
1112 location: Location,
1113 source: datatypes::Error,
1114 },
1115
1116 #[snafu(display("Time index column not found"))]
1117 TimeIndexNotFound {
1118 #[snafu(implicit)]
1119 location: Location,
1120 },
1121
1122 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1123 ChangeColumnNotFound {
1124 column_name: String,
1125 region_id: RegionId,
1126 #[snafu(implicit)]
1127 location: Location,
1128 },
1129
1130 #[snafu(display("Failed to convert column schema"))]
1131 ConvertColumnSchema {
1132 source: api::error::Error,
1133 #[snafu(implicit)]
1134 location: Location,
1135 },
1136
1137 #[snafu(display("Failed to convert TimeRanges"))]
1138 ConvertTimeRanges {
1139 source: api::error::Error,
1140 #[snafu(implicit)]
1141 location: Location,
1142 },
1143
1144 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1145 InvalidSetRegionOptionRequest {
1146 key: String,
1147 value: String,
1148 #[snafu(implicit)]
1149 location: Location,
1150 },
1151
1152 #[snafu(display("Invalid set region option request, key: {}", key))]
1153 InvalidUnsetRegionOptionRequest {
1154 key: String,
1155 #[snafu(implicit)]
1156 location: Location,
1157 },
1158
1159 #[snafu(display("Failed to decode protobuf"))]
1160 DecodeProto {
1161 #[snafu(source)]
1162 error: prost::UnknownEnumValue,
1163 #[snafu(implicit)]
1164 location: Location,
1165 },
1166
1167 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1168 InvalidColumnOption {
1169 column_name: String,
1170 msg: String,
1171 #[snafu(implicit)]
1172 location: Location,
1173 },
1174
1175 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1176 SetFulltextOptions {
1177 column_name: String,
1178 source: datatypes::Error,
1179 #[snafu(implicit)]
1180 location: Location,
1181 },
1182
1183 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1184 GetFulltextOptions {
1185 column_name: String,
1186 source: datatypes::Error,
1187 #[snafu(implicit)]
1188 location: Location,
1189 },
1190
1191 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1192 SetSkippingIndexOptions {
1193 column_name: String,
1194 source: datatypes::Error,
1195 #[snafu(implicit)]
1196 location: Location,
1197 },
1198
1199 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1200 UnsetSkippingIndexOptions {
1201 column_name: String,
1202 source: datatypes::Error,
1203 #[snafu(implicit)]
1204 location: Location,
1205 },
1206
1207 #[snafu(display("Failed to decode arrow ipc record batches"))]
1208 DecodeArrowIpc {
1209 #[snafu(source)]
1210 error: arrow::error::ArrowError,
1211 #[snafu(implicit)]
1212 location: Location,
1213 },
1214
1215 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1216 CastDefaultValue {
1217 reason: String,
1218 source: datatypes::Error,
1219 #[snafu(implicit)]
1220 location: Location,
1221 },
1222
1223 #[snafu(display("Unexpected: {}", reason))]
1224 Unexpected {
1225 reason: String,
1226 #[snafu(implicit)]
1227 location: Location,
1228 },
1229
1230 #[snafu(display("Failed to encode/decode flight message"))]
1231 FlightCodec {
1232 source: common_grpc::Error,
1233 #[snafu(implicit)]
1234 location: Location,
1235 },
1236
1237 #[snafu(display("Invalid index option"))]
1238 InvalidIndexOption {
1239 #[snafu(implicit)]
1240 location: Location,
1241 #[snafu(source)]
1242 error: datatypes::error::Error,
1243 },
1244
1245 #[snafu(display("Sql common error"))]
1246 SqlCommon {
1247 source: common_sql::error::Error,
1248 #[snafu(implicit)]
1249 location: Location,
1250 },
1251}
1252
1253impl ErrorExt for MetadataError {
1254 fn status_code(&self) -> StatusCode {
1255 match self {
1256 Self::SqlCommon { source, .. } => source.status_code(),
1257 _ => StatusCode::InvalidArguments,
1258 }
1259 }
1260
1261 fn as_any(&self) -> &dyn Any {
1262 self
1263 }
1264}
1265
1266fn set_column_fulltext_options(
1275 column_meta: &mut ColumnMetadata,
1276 column_name: &str,
1277 options: &FulltextOptions,
1278 current_options: Option<FulltextOptions>,
1279) -> Result<()> {
1280 if let Some(current_options) = current_options {
1281 ensure!(
1282 current_options.analyzer == options.analyzer
1283 && current_options.case_sensitive == options.case_sensitive,
1284 InvalidColumnOptionSnafu {
1285 column_name,
1286 msg: format!(
1287 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1288 current_options.analyzer, current_options.case_sensitive
1289 ),
1290 }
1291 );
1292 }
1293
1294 column_meta
1295 .column_schema
1296 .set_fulltext_options(options)
1297 .context(SetFulltextOptionsSnafu { column_name })?;
1298
1299 Ok(())
1300}
1301
1302fn unset_column_fulltext_options(
1303 column_meta: &mut ColumnMetadata,
1304 column_name: &str,
1305 current_options: Option<FulltextOptions>,
1306) -> Result<()> {
1307 if let Some(mut current_options) = current_options
1308 && current_options.enable
1309 {
1310 current_options.enable = false;
1311 column_meta
1312 .column_schema
1313 .set_fulltext_options(¤t_options)
1314 .context(SetFulltextOptionsSnafu { column_name })?;
1315 } else {
1316 return InvalidColumnOptionSnafu {
1317 column_name,
1318 msg: "FULLTEXT index already disabled",
1319 }
1320 .fail();
1321 }
1322
1323 Ok(())
1324}
1325
1326#[cfg(test)]
1327mod test {
1328 use datatypes::prelude::ConcreteDataType;
1329 use datatypes::schema::{
1330 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1331 };
1332 use datatypes::value::Value;
1333
1334 use super::*;
1335
1336 fn create_builder() -> RegionMetadataBuilder {
1337 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1338 }
1339
1340 fn build_test_region_metadata() -> RegionMetadata {
1341 let mut builder = create_builder();
1342 builder
1343 .push_column_metadata(ColumnMetadata {
1344 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1345 semantic_type: SemanticType::Tag,
1346 column_id: 1,
1347 })
1348 .push_column_metadata(ColumnMetadata {
1349 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1350 semantic_type: SemanticType::Field,
1351 column_id: 2,
1352 })
1353 .push_column_metadata(ColumnMetadata {
1354 column_schema: ColumnSchema::new(
1355 "c",
1356 ConcreteDataType::timestamp_millisecond_datatype(),
1357 false,
1358 ),
1359 semantic_type: SemanticType::Timestamp,
1360 column_id: 3,
1361 })
1362 .primary_key(vec![1])
1363 .partition_expr_json(Some("".to_string()));
1364 builder.build().unwrap()
1365 }
1366
1367 #[test]
1368 fn test_region_metadata() {
1369 let region_metadata = build_test_region_metadata();
1370 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1371 assert_eq!(
1372 "a",
1373 region_metadata.column_by_id(1).unwrap().column_schema.name
1374 );
1375 assert_eq!(None, region_metadata.column_by_id(10));
1376 }
1377
1378 #[test]
1379 fn test_region_metadata_serde() {
1380 let region_metadata = build_test_region_metadata();
1381 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1382 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1383 assert_eq!(region_metadata, deserialized);
1384 }
1385
1386 #[test]
1387 fn test_column_metadata_validate() {
1388 let mut builder = create_builder();
1389 let col = ColumnMetadata {
1390 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1391 semantic_type: SemanticType::Timestamp,
1392 column_id: 1,
1393 };
1394
1395 builder.push_column_metadata(col);
1396 let err = builder.build().unwrap_err();
1397 assert!(
1398 err.to_string()
1399 .contains("column `ts` is not timestamp type"),
1400 "unexpected err: {err}",
1401 );
1402 }
1403
1404 #[test]
1405 fn test_empty_region_metadata() {
1406 let builder = create_builder();
1407 let err = builder.build().unwrap_err();
1408 assert!(
1410 err.to_string().contains("time index not found"),
1411 "unexpected err: {err}",
1412 );
1413 }
1414
1415 #[test]
1416 fn test_same_column_id() {
1417 let mut builder = create_builder();
1418 builder
1419 .push_column_metadata(ColumnMetadata {
1420 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1421 semantic_type: SemanticType::Tag,
1422 column_id: 1,
1423 })
1424 .push_column_metadata(ColumnMetadata {
1425 column_schema: ColumnSchema::new(
1426 "b",
1427 ConcreteDataType::timestamp_millisecond_datatype(),
1428 false,
1429 ),
1430 semantic_type: SemanticType::Timestamp,
1431 column_id: 1,
1432 });
1433 let err = builder.build().unwrap_err();
1434 assert!(
1435 err.to_string()
1436 .contains("column a and b have the same column id"),
1437 "unexpected err: {err}",
1438 );
1439 }
1440
1441 #[test]
1442 fn test_duplicate_time_index() {
1443 let mut builder = create_builder();
1444 builder
1445 .push_column_metadata(ColumnMetadata {
1446 column_schema: ColumnSchema::new(
1447 "a",
1448 ConcreteDataType::timestamp_millisecond_datatype(),
1449 false,
1450 ),
1451 semantic_type: SemanticType::Timestamp,
1452 column_id: 1,
1453 })
1454 .push_column_metadata(ColumnMetadata {
1455 column_schema: ColumnSchema::new(
1456 "b",
1457 ConcreteDataType::timestamp_millisecond_datatype(),
1458 false,
1459 ),
1460 semantic_type: SemanticType::Timestamp,
1461 column_id: 2,
1462 });
1463 let err = builder.build().unwrap_err();
1464 assert!(
1465 err.to_string().contains("expect only one time index"),
1466 "unexpected err: {err}",
1467 );
1468 }
1469
1470 #[test]
1471 fn test_unknown_primary_key() {
1472 let mut builder = create_builder();
1473 builder
1474 .push_column_metadata(ColumnMetadata {
1475 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1476 semantic_type: SemanticType::Tag,
1477 column_id: 1,
1478 })
1479 .push_column_metadata(ColumnMetadata {
1480 column_schema: ColumnSchema::new(
1481 "b",
1482 ConcreteDataType::timestamp_millisecond_datatype(),
1483 false,
1484 ),
1485 semantic_type: SemanticType::Timestamp,
1486 column_id: 2,
1487 })
1488 .primary_key(vec![3]);
1489 let err = builder.build().unwrap_err();
1490 assert!(
1491 err.to_string().contains("unknown column id 3"),
1492 "unexpected err: {err}",
1493 );
1494 }
1495
1496 #[test]
1497 fn test_same_primary_key() {
1498 let mut builder = create_builder();
1499 builder
1500 .push_column_metadata(ColumnMetadata {
1501 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1502 semantic_type: SemanticType::Tag,
1503 column_id: 1,
1504 })
1505 .push_column_metadata(ColumnMetadata {
1506 column_schema: ColumnSchema::new(
1507 "b",
1508 ConcreteDataType::timestamp_millisecond_datatype(),
1509 false,
1510 ),
1511 semantic_type: SemanticType::Timestamp,
1512 column_id: 2,
1513 })
1514 .primary_key(vec![1, 1]);
1515 let err = builder.build().unwrap_err();
1516 assert!(
1517 err.to_string()
1518 .contains("duplicate column a in primary key"),
1519 "unexpected err: {err}",
1520 );
1521 }
1522
1523 #[test]
1524 fn test_in_time_index() {
1525 let mut builder = create_builder();
1526 builder
1527 .push_column_metadata(ColumnMetadata {
1528 column_schema: ColumnSchema::new(
1529 "ts",
1530 ConcreteDataType::timestamp_millisecond_datatype(),
1531 false,
1532 ),
1533 semantic_type: SemanticType::Timestamp,
1534 column_id: 1,
1535 })
1536 .primary_key(vec![1]);
1537 let err = builder.build().unwrap_err();
1538 assert!(
1539 err.to_string()
1540 .contains("column ts is already a time index column"),
1541 "unexpected err: {err}",
1542 );
1543 }
1544
1545 #[test]
1546 fn test_nullable_time_index() {
1547 let mut builder = create_builder();
1548 builder.push_column_metadata(ColumnMetadata {
1549 column_schema: ColumnSchema::new(
1550 "ts",
1551 ConcreteDataType::timestamp_millisecond_datatype(),
1552 true,
1553 ),
1554 semantic_type: SemanticType::Timestamp,
1555 column_id: 1,
1556 });
1557 let err = builder.build().unwrap_err();
1558 assert!(
1559 err.to_string()
1560 .contains("time index column ts must be NOT NULL"),
1561 "unexpected err: {err}",
1562 );
1563 }
1564
1565 #[test]
1566 fn test_primary_key_semantic_type() {
1567 let mut builder = create_builder();
1568 builder
1569 .push_column_metadata(ColumnMetadata {
1570 column_schema: ColumnSchema::new(
1571 "ts",
1572 ConcreteDataType::timestamp_millisecond_datatype(),
1573 false,
1574 ),
1575 semantic_type: SemanticType::Timestamp,
1576 column_id: 1,
1577 })
1578 .push_column_metadata(ColumnMetadata {
1579 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1580 semantic_type: SemanticType::Field,
1581 column_id: 2,
1582 })
1583 .primary_key(vec![2]);
1584 let err = builder.build().unwrap_err();
1585 assert!(
1586 err.to_string()
1587 .contains("semantic type of column a should be Tag, not Field"),
1588 "unexpected err: {err}",
1589 );
1590 }
1591
1592 #[test]
1593 fn test_primary_key_tag_num() {
1594 let mut builder = create_builder();
1595 builder
1596 .push_column_metadata(ColumnMetadata {
1597 column_schema: ColumnSchema::new(
1598 "ts",
1599 ConcreteDataType::timestamp_millisecond_datatype(),
1600 false,
1601 ),
1602 semantic_type: SemanticType::Timestamp,
1603 column_id: 1,
1604 })
1605 .push_column_metadata(ColumnMetadata {
1606 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1607 semantic_type: SemanticType::Tag,
1608 column_id: 2,
1609 })
1610 .push_column_metadata(ColumnMetadata {
1611 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1612 semantic_type: SemanticType::Tag,
1613 column_id: 3,
1614 })
1615 .primary_key(vec![2]);
1616 let err = builder.build().unwrap_err();
1617 assert!(
1618 err.to_string()
1619 .contains("number of primary key columns 1 not equal to tag columns 2"),
1620 "unexpected err: {err}",
1621 );
1622 }
1623
1624 #[test]
1625 fn test_bump_version() {
1626 let mut region_metadata = build_test_region_metadata();
1627 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1628 builder.bump_version();
1629 let new_meta = builder.build().unwrap();
1630 region_metadata.schema_version += 1;
1631 assert_eq!(region_metadata, new_meta);
1632 }
1633
1634 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1635 let semantic_type = if is_tag {
1636 SemanticType::Tag
1637 } else {
1638 SemanticType::Field
1639 };
1640 ColumnMetadata {
1641 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1642 semantic_type,
1643 column_id,
1644 }
1645 }
1646
1647 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1648 let actual: Vec<_> = metadata
1649 .column_metadatas
1650 .iter()
1651 .map(|col| &col.column_schema.name)
1652 .collect();
1653 assert_eq!(names, actual);
1654 }
1655
1656 fn get_columns_default_constraint(
1657 metadata: &RegionMetadata,
1658 name: String,
1659 ) -> Option<Option<&ColumnDefaultConstraint>> {
1660 metadata.column_metadatas.iter().find_map(|col| {
1661 if col.column_schema.name == name {
1662 Some(col.column_schema.default_constraint())
1663 } else {
1664 None
1665 }
1666 })
1667 }
1668
1669 #[test]
1670 fn test_alter() {
1671 let metadata = build_test_region_metadata();
1673 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1674 builder
1676 .alter(AlterKind::AddColumns {
1677 columns: vec![AddColumn {
1678 column_metadata: new_column_metadata("d", true, 4),
1679 location: None,
1680 }],
1681 })
1682 .unwrap();
1683 let metadata = builder.build().unwrap();
1684 check_columns(&metadata, &["a", "b", "c", "d"]);
1685 assert_eq!([1, 4], &metadata.primary_key[..]);
1686
1687 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1688 builder
1689 .alter(AlterKind::AddColumns {
1690 columns: vec![AddColumn {
1691 column_metadata: new_column_metadata("e", false, 5),
1692 location: Some(AddColumnLocation::First),
1693 }],
1694 })
1695 .unwrap();
1696 let metadata = builder.build().unwrap();
1697 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1698
1699 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1700 builder
1701 .alter(AlterKind::AddColumns {
1702 columns: vec![AddColumn {
1703 column_metadata: new_column_metadata("f", false, 6),
1704 location: Some(AddColumnLocation::After {
1705 column_name: "b".to_string(),
1706 }),
1707 }],
1708 })
1709 .unwrap();
1710 let metadata = builder.build().unwrap();
1711 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1712
1713 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1714 builder
1715 .alter(AlterKind::AddColumns {
1716 columns: vec![AddColumn {
1717 column_metadata: new_column_metadata("g", false, 7),
1718 location: Some(AddColumnLocation::After {
1719 column_name: "d".to_string(),
1720 }),
1721 }],
1722 })
1723 .unwrap();
1724 let metadata = builder.build().unwrap();
1725 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1726
1727 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1728 builder
1729 .alter(AlterKind::DropColumns {
1730 names: vec!["g".to_string(), "e".to_string()],
1731 })
1732 .unwrap();
1733 let metadata = builder.build().unwrap();
1734 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1735
1736 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1737 builder
1738 .alter(AlterKind::DropColumns {
1739 names: vec!["a".to_string()],
1740 })
1741 .unwrap();
1742 let err = builder.build().unwrap_err();
1744 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1745
1746 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1747 let mut column_metadata = new_column_metadata("g", false, 8);
1748 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1749 column_metadata.column_schema = column_metadata
1750 .column_schema
1751 .with_default_constraint(default_constraint.clone())
1752 .unwrap();
1753 builder
1754 .alter(AlterKind::AddColumns {
1755 columns: vec![AddColumn {
1756 column_metadata,
1757 location: None,
1758 }],
1759 })
1760 .unwrap();
1761 let metadata = builder.build().unwrap();
1762 assert_eq!(
1763 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1764 default_constraint.as_ref()
1765 );
1766 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1767
1768 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1769 builder
1770 .alter(AlterKind::DropDefaults {
1771 names: vec!["g".to_string()],
1772 })
1773 .unwrap();
1774 let metadata = builder.build().unwrap();
1775 assert_eq!(
1776 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1777 None
1778 );
1779 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1780
1781 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1782 builder
1783 .alter(AlterKind::DropColumns {
1784 names: vec!["g".to_string()],
1785 })
1786 .unwrap();
1787 let metadata = builder.build().unwrap();
1788 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1789
1790 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1791 builder
1792 .alter(AlterKind::ModifyColumnTypes {
1793 columns: vec![ModifyColumnType {
1794 column_name: "b".to_string(),
1795 target_type: ConcreteDataType::string_datatype(),
1796 }],
1797 })
1798 .unwrap();
1799 let metadata = builder.build().unwrap();
1800 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1801 let b_type = &metadata
1802 .column_by_name("b")
1803 .unwrap()
1804 .column_schema
1805 .data_type;
1806 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1807
1808 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1809 builder
1810 .alter(AlterKind::SetIndexes {
1811 options: vec![SetIndexOption::Fulltext {
1812 column_name: "b".to_string(),
1813 options: FulltextOptions::new_unchecked(
1814 true,
1815 FulltextAnalyzer::Chinese,
1816 true,
1817 FulltextBackend::Bloom,
1818 1000,
1819 0.01,
1820 ),
1821 }],
1822 })
1823 .unwrap();
1824 let metadata = builder.build().unwrap();
1825 let a_fulltext_options = metadata
1826 .column_by_name("b")
1827 .unwrap()
1828 .column_schema
1829 .fulltext_options()
1830 .unwrap()
1831 .unwrap();
1832 assert!(a_fulltext_options.enable);
1833 assert_eq!(
1834 datatypes::schema::FulltextAnalyzer::Chinese,
1835 a_fulltext_options.analyzer
1836 );
1837 assert!(a_fulltext_options.case_sensitive);
1838
1839 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1840 builder
1841 .alter(AlterKind::UnsetIndexes {
1842 options: vec![UnsetIndexOption::Fulltext {
1843 column_name: "b".to_string(),
1844 }],
1845 })
1846 .unwrap();
1847 let metadata = builder.build().unwrap();
1848 let a_fulltext_options = metadata
1849 .column_by_name("b")
1850 .unwrap()
1851 .column_schema
1852 .fulltext_options()
1853 .unwrap()
1854 .unwrap();
1855 assert!(!a_fulltext_options.enable);
1856 assert_eq!(
1857 datatypes::schema::FulltextAnalyzer::Chinese,
1858 a_fulltext_options.analyzer
1859 );
1860 assert!(a_fulltext_options.case_sensitive);
1861 }
1862
1863 #[test]
1864 fn test_add_if_not_exists() {
1865 let metadata = build_test_region_metadata();
1867 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1868 builder
1870 .alter(AlterKind::AddColumns {
1871 columns: vec![
1872 AddColumn {
1873 column_metadata: new_column_metadata("d", true, 4),
1874 location: None,
1875 },
1876 AddColumn {
1877 column_metadata: new_column_metadata("d", true, 4),
1878 location: None,
1879 },
1880 ],
1881 })
1882 .unwrap();
1883 let metadata = builder.build().unwrap();
1884 check_columns(&metadata, &["a", "b", "c", "d"]);
1885 assert_eq!([1, 4], &metadata.primary_key[..]);
1886
1887 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1888 builder
1890 .alter(AlterKind::AddColumns {
1891 columns: vec![AddColumn {
1892 column_metadata: new_column_metadata("b", false, 2),
1893 location: None,
1894 }],
1895 })
1896 .unwrap();
1897 let metadata = builder.build().unwrap();
1898 check_columns(&metadata, &["a", "b", "c", "d"]);
1899 }
1900
1901 #[test]
1902 fn test_add_column_with_inverted_index() {
1903 let metadata = build_test_region_metadata();
1907 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1908 let mut col = new_column_metadata("d", true, 4);
1910 col.column_schema.set_inverted_index(true);
1911 builder
1912 .alter(AlterKind::AddColumns {
1913 columns: vec![
1914 AddColumn {
1915 column_metadata: col,
1916 location: None,
1917 },
1918 AddColumn {
1919 column_metadata: new_column_metadata("e", true, 5),
1920 location: None,
1921 },
1922 ],
1923 })
1924 .unwrap();
1925 let metadata = builder.build().unwrap();
1926 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1927 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1928 let column_metadata = metadata.column_by_name("a").unwrap();
1929 assert!(!column_metadata.column_schema.is_inverted_indexed());
1930 let column_metadata = metadata.column_by_name("b").unwrap();
1931 assert!(!column_metadata.column_schema.is_inverted_indexed());
1932 let column_metadata = metadata.column_by_name("c").unwrap();
1933 assert!(!column_metadata.column_schema.is_inverted_indexed());
1934 let column_metadata = metadata.column_by_name("d").unwrap();
1935 assert!(column_metadata.column_schema.is_inverted_indexed());
1936 let column_metadata = metadata.column_by_name("e").unwrap();
1937 assert!(!column_metadata.column_schema.is_inverted_indexed());
1938 }
1939
1940 #[test]
1941 fn test_drop_if_exists() {
1942 let metadata = build_test_region_metadata();
1944 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1945 builder
1947 .alter(AlterKind::AddColumns {
1948 columns: vec![
1949 AddColumn {
1950 column_metadata: new_column_metadata("d", false, 4),
1951 location: None,
1952 },
1953 AddColumn {
1954 column_metadata: new_column_metadata("e", false, 5),
1955 location: None,
1956 },
1957 ],
1958 })
1959 .unwrap();
1960 let metadata = builder.build().unwrap();
1961 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1962
1963 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1964 builder
1965 .alter(AlterKind::DropColumns {
1966 names: vec!["b".to_string(), "b".to_string()],
1967 })
1968 .unwrap();
1969 let metadata = builder.build().unwrap();
1970 check_columns(&metadata, &["a", "c", "d", "e"]);
1971
1972 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1973 builder
1974 .alter(AlterKind::DropColumns {
1975 names: vec!["b".to_string(), "e".to_string()],
1976 })
1977 .unwrap();
1978 let metadata = builder.build().unwrap();
1979 check_columns(&metadata, &["a", "c", "d"]);
1980 }
1981
1982 #[test]
1983 fn test_invalid_column_name() {
1984 let mut builder = create_builder();
1985 builder.push_column_metadata(ColumnMetadata {
1986 column_schema: ColumnSchema::new(
1987 "__sequence",
1988 ConcreteDataType::timestamp_millisecond_datatype(),
1989 false,
1990 ),
1991 semantic_type: SemanticType::Timestamp,
1992 column_id: 1,
1993 });
1994 let err = builder.build().unwrap_err();
1995 assert!(
1996 err.to_string()
1997 .contains("internal column name that can not be used"),
1998 "unexpected err: {err}",
1999 );
2000 }
2001
2002 #[test]
2003 fn test_allow_internal_column_name() {
2004 let mut builder = create_builder();
2005 builder
2006 .push_column_metadata(ColumnMetadata {
2007 column_schema: ColumnSchema::new(
2008 "__primary_key",
2009 ConcreteDataType::string_datatype(),
2010 false,
2011 ),
2012 semantic_type: SemanticType::Tag,
2013 column_id: 1,
2014 })
2015 .push_column_metadata(ColumnMetadata {
2016 column_schema: ColumnSchema::new(
2017 "ts",
2018 ConcreteDataType::timestamp_millisecond_datatype(),
2019 false,
2020 ),
2021 semantic_type: SemanticType::Timestamp,
2022 column_id: 2,
2023 })
2024 .primary_key(vec![1]);
2025
2026 let metadata = builder.build_without_validation().unwrap();
2027 assert_eq!(
2028 "__primary_key",
2029 metadata.column_metadatas[0].column_schema.name
2030 );
2031 }
2032
2033 #[test]
2034 fn test_build_without_validation() {
2035 let mut builder = create_builder();
2037 builder
2038 .push_column_metadata(ColumnMetadata {
2039 column_schema: ColumnSchema::new(
2040 "ts",
2041 ConcreteDataType::timestamp_millisecond_datatype(),
2042 false,
2043 ),
2044 semantic_type: SemanticType::Timestamp,
2045 column_id: 1,
2046 })
2047 .push_column_metadata(ColumnMetadata {
2048 column_schema: ColumnSchema::new(
2049 "field",
2050 ConcreteDataType::string_datatype(),
2051 true,
2052 ),
2053 semantic_type: SemanticType::Field,
2054 column_id: 2,
2055 })
2056 .primary_key(vec![2]);
2057
2058 let metadata = builder.build_without_validation().unwrap();
2060 assert_eq!(vec![2], metadata.primary_key);
2061
2062 let mut builder = create_builder();
2064 builder
2065 .push_column_metadata(ColumnMetadata {
2066 column_schema: ColumnSchema::new(
2067 "ts",
2068 ConcreteDataType::timestamp_millisecond_datatype(),
2069 false,
2070 ),
2071 semantic_type: SemanticType::Timestamp,
2072 column_id: 1,
2073 })
2074 .push_column_metadata(ColumnMetadata {
2075 column_schema: ColumnSchema::new(
2076 "field",
2077 ConcreteDataType::string_datatype(),
2078 true,
2079 ),
2080 semantic_type: SemanticType::Field,
2081 column_id: 2,
2082 })
2083 .primary_key(vec![2]);
2084 let err = builder.build().unwrap_err();
2085 assert!(
2086 err.to_string()
2087 .contains("semantic type of column field should be Tag"),
2088 "unexpected err: {err}"
2089 );
2090 }
2091
2092 #[test]
2093 fn test_debug_for_column_metadata() {
2094 let region_metadata = build_test_region_metadata();
2095 let formatted = format!("{:?}", region_metadata);
2096 assert_eq!(
2097 formatted,
2098 "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2099 );
2100 }
2101
2102 #[test]
2103 fn test_region_metadata_deserialize_default_primary_key_encoding() {
2104 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2105 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2106 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2107
2108 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2109 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2110 assert_eq!(
2111 deserialized.primary_key_encoding,
2112 PrimaryKeyEncoding::Sparse
2113 );
2114 }
2115}