1use std::collections::{HashMap, HashSet};
16use std::fmt;
17
18use api::v1::SemanticType;
19use datatypes::schema::ColumnSchema;
20use snafu::{ensure, OptionExt};
21use store_api::metadata::{ColumnMetadata, RegionMetadata};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::RawTableMeta;
24use table::table_reference::TableReference;
25
26use crate::error::{
27 MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
28};
29
30#[derive(Debug, PartialEq, Eq)]
31struct PartialRegionMetadata<'a> {
32 column_metadatas: &'a [ColumnMetadata],
33 primary_key: &'a [u32],
34 table_id: TableId,
35}
36
37impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
38 fn from(region_metadata: &'a RegionMetadata) -> Self {
39 Self {
40 column_metadatas: ®ion_metadata.column_metadatas,
41 primary_key: ®ion_metadata.primary_key,
42 table_id: region_metadata.region_id.table_id(),
43 }
44 }
45}
46
47struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata);
49
50impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 let col = self.0;
53 write!(
54 f,
55 "Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}",
56 col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type,
57 )
58 }
59}
60
61pub(crate) fn check_column_metadatas_consistent(
71 region_metadatas: &[RegionMetadata],
72) -> Option<Vec<ColumnMetadata>> {
73 let is_column_metadata_consistent = region_metadatas
74 .windows(2)
75 .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
76
77 if !is_column_metadata_consistent {
78 return None;
79 }
80
81 Some(region_metadatas[0].column_metadatas.clone())
82}
83
84pub(crate) fn resolve_column_metadatas_with_metasrv(
92 column_metadatas: &[ColumnMetadata],
93 region_metadatas: &[RegionMetadata],
94) -> Result<Vec<RegionId>> {
95 let is_same_table = region_metadatas
96 .windows(2)
97 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
98
99 ensure!(
100 is_same_table,
101 UnexpectedSnafu {
102 err_msg: "Region metadatas are not from the same table"
103 }
104 );
105
106 let mut regions_ids = vec![];
107 for region_metadata in region_metadatas {
108 if region_metadata.column_metadatas != column_metadatas {
109 let is_invariant_preserved = check_column_metadata_invariants(
110 column_metadatas,
111 ®ion_metadata.column_metadatas,
112 );
113 ensure!(
114 is_invariant_preserved,
115 UnexpectedSnafu {
116 err_msg: format!(
117 "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
118 region_metadata.region_id,
119 column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
120 region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
121 )
122 }
123 );
124 regions_ids.push(region_metadata.region_id);
125 }
126 }
127 Ok(regions_ids)
128}
129
130pub(crate) fn resolve_column_metadatas_with_latest(
138 region_metadatas: &[RegionMetadata],
139) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
140 let is_same_table = region_metadatas
141 .windows(2)
142 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
143
144 ensure!(
145 is_same_table,
146 UnexpectedSnafu {
147 err_msg: "Region metadatas are not from the same table"
148 }
149 );
150
151 let latest_region_metadata = region_metadatas
152 .iter()
153 .max_by_key(|c| c.schema_version)
154 .context(UnexpectedSnafu {
155 err_msg: "All Region metadatas have the same schema version",
156 })?;
157 let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
158
159 let mut region_ids = vec![];
160 for region_metadata in region_metadatas {
161 if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
162 let is_invariant_preserved = check_column_metadata_invariants(
163 &latest_region_metadata.column_metadatas,
164 ®ion_metadata.column_metadatas,
165 );
166 ensure!(
167 is_invariant_preserved,
168 UnexpectedSnafu {
169 err_msg: format!(
170 "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
171 region_metadata.region_id,
172 latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
173 region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>()
174 )
175 }
176 );
177 region_ids.push(region_metadata.region_id);
178 }
179 }
180
181 Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
183}
184
185pub(crate) fn build_column_metadata_from_table_info(
193 column_schemas: &[ColumnSchema],
194 primary_key_indexes: &[usize],
195 name_to_ids: &HashMap<String, u32>,
196) -> Result<Vec<ColumnMetadata>> {
197 let primary_names = primary_key_indexes
198 .iter()
199 .map(|i| column_schemas[*i].name.as_str())
200 .collect::<HashSet<_>>();
201
202 column_schemas
203 .iter()
204 .map(|column_schema| {
205 let column_id = *name_to_ids
206 .get(column_schema.name.as_str())
207 .with_context(|| UnexpectedSnafu {
208 err_msg: format!(
209 "Column name {} not found in name_to_ids",
210 column_schema.name
211 ),
212 })?;
213
214 let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
215 SemanticType::Tag
216 } else if column_schema.is_time_index() {
217 SemanticType::Timestamp
218 } else {
219 SemanticType::Field
220 };
221 Ok(ColumnMetadata {
222 column_schema: column_schema.clone(),
223 semantic_type,
224 column_id,
225 })
226 })
227 .collect::<Result<Vec<_>>>()
228}
229
230pub(crate) fn check_column_metadata_invariants(
236 new_column_metadatas: &[ColumnMetadata],
237 column_metadatas: &[ColumnMetadata],
238) -> bool {
239 let new_primary_keys = new_column_metadatas
240 .iter()
241 .filter(|c| c.semantic_type == SemanticType::Tag)
242 .map(|c| (c.column_schema.name.as_str(), c.column_id))
243 .collect::<HashMap<_, _>>();
244
245 let old_primary_keys = column_metadatas
246 .iter()
247 .filter(|c| c.semantic_type == SemanticType::Tag)
248 .map(|c| (c.column_schema.name.as_str(), c.column_id));
249
250 for (name, id) in old_primary_keys {
251 if new_primary_keys.get(name) != Some(&id) {
252 return false;
253 }
254 }
255
256 let new_ts_column = new_column_metadatas
257 .iter()
258 .find(|c| c.semantic_type == SemanticType::Timestamp)
259 .map(|c| (c.column_schema.name.as_str(), c.column_id));
260
261 let old_ts_column = column_metadatas
262 .iter()
263 .find(|c| c.semantic_type == SemanticType::Timestamp)
264 .map(|c| (c.column_schema.name.as_str(), c.column_id));
265
266 new_ts_column == old_ts_column
267}
268
269pub(crate) fn build_table_meta_from_column_metadatas(
277 table_id: TableId,
278 table_ref: TableReference,
279 table_meta: &RawTableMeta,
280 name_to_ids: &HashMap<String, u32>,
281 column_metadata: &[ColumnMetadata],
282) -> Result<RawTableMeta> {
283 let column_in_column_metadata = column_metadata
284 .iter()
285 .map(|c| (c.column_schema.name.as_str(), c))
286 .collect::<HashMap<_, _>>();
287 let primary_key_names = table_meta
288 .primary_key_indices
289 .iter()
290 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
291 .collect::<HashSet<_>>();
292 let partition_key_names = table_meta
293 .partition_key_indices
294 .iter()
295 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
296 .collect::<HashSet<_>>();
297 ensure!(
298 column_metadata
299 .iter()
300 .any(|c| c.semantic_type == SemanticType::Timestamp),
301 UnexpectedSnafu {
302 err_msg: format!(
303 "Missing table index in column metadata, table: {}, table_id: {}",
304 table_ref, table_id
305 ),
306 }
307 );
308
309 for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
311 let column_in_column_metadata =
312 column_in_column_metadata
313 .get(column_name)
314 .with_context(|| MissingColumnInColumnMetadataSnafu {
315 column_name: column_name.to_string(),
316 table_name: table_ref.to_string(),
317 table_id,
318 })?;
319
320 let column_id = *name_to_ids
321 .get(*column_name)
322 .with_context(|| UnexpectedSnafu {
323 err_msg: format!("column id not found in name_to_ids: {}", column_name),
324 })?;
325 ensure!(
326 column_id == column_in_column_metadata.column_id,
327 MismatchColumnIdSnafu {
328 column_name: column_name.to_string(),
329 column_id,
330 table_name: table_ref.to_string(),
331 table_id,
332 }
333 );
334 }
335
336 let mut new_raw_table_meta = table_meta.clone();
337 let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
338 let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
339 let value_indices = &mut new_raw_table_meta.value_indices;
340 let time_index = &mut new_raw_table_meta.schema.timestamp_index;
341 let columns = &mut new_raw_table_meta.schema.column_schemas;
342 let column_ids = &mut new_raw_table_meta.column_ids;
343
344 column_ids.clear();
345 value_indices.clear();
346 columns.clear();
347 primary_key_indices.clear();
348 partition_key_indices.clear();
349
350 for (idx, col) in column_metadata.iter().enumerate() {
351 if partition_key_names.contains(&col.column_schema.name.as_str()) {
352 partition_key_indices.push(idx);
353 }
354 match col.semantic_type {
355 SemanticType::Tag => {
356 primary_key_indices.push(idx);
357 }
358 SemanticType::Field => {
359 value_indices.push(idx);
360 }
361 SemanticType::Timestamp => {
362 value_indices.push(idx);
363 *time_index = Some(idx);
364 }
365 }
366
367 columns.push(col.column_schema.clone());
368 column_ids.push(col.column_id);
369 }
370
371 if let Some(time_index) = *time_index {
372 new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
373 }
374
375 Ok(new_raw_table_meta)
376}
377
378#[cfg(test)]
379mod tests {
380 use std::assert_matches::assert_matches;
381 use std::collections::HashMap;
382 use std::sync::Arc;
383
384 use api::v1::SemanticType;
385 use datatypes::prelude::ConcreteDataType;
386 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
387 use store_api::metadata::ColumnMetadata;
388 use table::metadata::{RawTableMeta, TableMetaBuilder};
389 use table::table_reference::TableReference;
390
391 use super::*;
392 use crate::ddl::test_util::region_metadata::build_region_metadata;
393 use crate::error::Error;
394
395 fn new_test_schema() -> Schema {
396 let column_schemas = vec![
397 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
398 ColumnSchema::new(
399 "ts",
400 ConcreteDataType::timestamp_millisecond_datatype(),
401 false,
402 )
403 .with_time_index(true),
404 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
405 ];
406 SchemaBuilder::try_from(column_schemas)
407 .unwrap()
408 .version(123)
409 .build()
410 .unwrap()
411 }
412
413 fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
414 vec![
415 ColumnMetadata {
416 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
417 semantic_type: SemanticType::Tag,
418 column_id: 0,
419 },
420 ColumnMetadata {
421 column_schema: ColumnSchema::new(
422 "ts",
423 ConcreteDataType::timestamp_millisecond_datatype(),
424 false,
425 )
426 .with_time_index(true),
427 semantic_type: SemanticType::Timestamp,
428 column_id: 1,
429 },
430 ColumnMetadata {
431 column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
432 semantic_type: SemanticType::Field,
433 column_id: 2,
434 },
435 ]
436 }
437
438 fn new_test_raw_table_info() -> RawTableMeta {
439 let mut table_meta_builder = TableMetaBuilder::empty();
440 let table_meta = table_meta_builder
441 .schema(Arc::new(new_test_schema()))
442 .primary_key_indices(vec![0])
443 .partition_key_indices(vec![2])
444 .next_column_id(4)
445 .build()
446 .unwrap();
447
448 table_meta.into()
449 }
450
451 #[test]
452 fn test_build_table_info_from_column_metadatas() {
453 let mut column_metadatas = new_test_column_metadatas();
454 column_metadatas.push(ColumnMetadata {
455 column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
456 semantic_type: SemanticType::Tag,
457 column_id: 3,
458 });
459
460 let table_id = 1;
461 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
462 let table_meta = new_test_raw_table_info();
463 let name_to_ids = HashMap::from([
464 ("col1".to_string(), 0),
465 ("ts".to_string(), 1),
466 ("col2".to_string(), 2),
467 ]);
468
469 let new_table_meta = build_table_meta_from_column_metadatas(
470 table_id,
471 table_ref,
472 &table_meta,
473 &name_to_ids,
474 &column_metadatas,
475 )
476 .unwrap();
477
478 assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
479 assert_eq!(new_table_meta.partition_key_indices, vec![2]);
480 assert_eq!(new_table_meta.value_indices, vec![1, 2]);
481 assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
482 assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
483 }
484
485 #[test]
486 fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
487 let column_metadatas = new_test_column_metadatas();
488 let table_id = 1;
489 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
490 let table_meta = new_test_raw_table_info();
491 let name_to_ids = HashMap::from([
492 ("col1".to_string(), 0),
493 ("ts".to_string(), 1),
494 ("col2".to_string(), 3),
496 ]);
497
498 let err = build_table_meta_from_column_metadatas(
499 table_id,
500 table_ref,
501 &table_meta,
502 &name_to_ids,
503 &column_metadatas,
504 )
505 .unwrap_err();
506
507 assert_matches!(err, Error::MismatchColumnId { .. });
508 }
509
510 #[test]
511 fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
512 let mut column_metadatas = new_test_column_metadatas();
513 column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
514 let table_id = 1;
515 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
516 let table_meta = new_test_raw_table_info();
517 let name_to_ids = HashMap::from([
518 ("col1".to_string(), 0),
519 ("ts".to_string(), 1),
520 ("col2".to_string(), 2),
521 ]);
522
523 let err = build_table_meta_from_column_metadatas(
524 table_id,
525 table_ref,
526 &table_meta,
527 &name_to_ids,
528 &column_metadatas,
529 )
530 .unwrap_err();
531
532 assert!(
533 err.to_string()
534 .contains("Missing table index in column metadata"),
535 "err: {}",
536 err
537 );
538 }
539
540 #[test]
541 fn test_build_table_info_from_column_metadatas_with_missing_column() {
542 let mut column_metadatas = new_test_column_metadatas();
543 column_metadatas.retain(|c| c.column_id != 0);
545 let table_id = 1;
546 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
547 let table_meta = new_test_raw_table_info();
548 let name_to_ids = HashMap::from([
549 ("col1".to_string(), 0),
550 ("ts".to_string(), 1),
551 ("col2".to_string(), 2),
552 ]);
553
554 let err = build_table_meta_from_column_metadatas(
555 table_id,
556 table_ref,
557 &table_meta,
558 &name_to_ids,
559 &column_metadatas,
560 )
561 .unwrap_err();
562 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
563
564 let mut column_metadatas = new_test_column_metadatas();
565 column_metadatas.retain(|c| c.column_id != 2);
567
568 let err = build_table_meta_from_column_metadatas(
569 table_id,
570 table_ref,
571 &table_meta,
572 &name_to_ids,
573 &column_metadatas,
574 )
575 .unwrap_err();
576 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
577 }
578
579 #[test]
580 fn test_check_column_metadatas_consistent() {
581 let column_metadatas = new_test_column_metadatas();
582 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
583 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
584 let result =
585 check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
586 assert_eq!(result, column_metadatas);
587
588 let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
589 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
590 let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
591 assert!(result.is_none());
592 }
593
594 #[test]
595 fn test_check_column_metadata_invariants() {
596 let column_metadatas = new_test_column_metadatas();
597 let mut new_column_metadatas = column_metadatas.clone();
598 new_column_metadatas.push(ColumnMetadata {
599 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
600 semantic_type: SemanticType::Field,
601 column_id: 3,
602 });
603 assert!(check_column_metadata_invariants(
604 &new_column_metadatas,
605 &column_metadatas
606 ));
607 }
608
609 #[test]
610 fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
611 let column_metadatas = new_test_column_metadatas();
612 let mut new_column_metadatas = column_metadatas.clone();
613 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
614 assert!(!check_column_metadata_invariants(
615 &new_column_metadatas,
616 &column_metadatas
617 ));
618
619 let column_metadatas = new_test_column_metadatas();
620 let mut new_column_metadatas = column_metadatas.clone();
621 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
622 assert!(!check_column_metadata_invariants(
623 &new_column_metadatas,
624 &column_metadatas
625 ));
626 }
627
628 #[test]
629 fn test_check_column_metadata_invariants_mismatch_column_id() {
630 let column_metadatas = new_test_column_metadatas();
631 let mut new_column_metadatas = column_metadatas.clone();
632 if let Some(col) = new_column_metadatas
633 .iter_mut()
634 .find(|c| c.semantic_type == SemanticType::Timestamp)
635 {
636 col.column_id = 100;
637 }
638 assert!(!check_column_metadata_invariants(
639 &new_column_metadatas,
640 &column_metadatas
641 ));
642
643 let column_metadatas = new_test_column_metadatas();
644 let mut new_column_metadatas = column_metadatas.clone();
645 if let Some(col) = new_column_metadatas
646 .iter_mut()
647 .find(|c| c.semantic_type == SemanticType::Tag)
648 {
649 col.column_id = 100;
650 }
651 assert!(!check_column_metadata_invariants(
652 &new_column_metadatas,
653 &column_metadatas
654 ));
655 }
656
657 #[test]
658 fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
659 let column_metadatas = new_test_column_metadatas();
660 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
661 let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
662 metasrv_column_metadatas.push(ColumnMetadata {
663 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
664 semantic_type: SemanticType::Field,
665 column_id: 3,
666 });
667 let result =
668 resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
669 .unwrap();
670
671 assert_eq!(result, vec![RegionId::new(1024, 0)]);
672 }
673
674 #[test]
675 fn test_resolve_column_metadatas_with_use_latest_strategy() {
676 let column_metadatas = new_test_column_metadatas();
677 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
678 let mut new_column_metadatas = column_metadatas.clone();
679 new_column_metadatas.push(ColumnMetadata {
680 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
681 semantic_type: SemanticType::Field,
682 column_id: 3,
683 });
684
685 let mut region_metadata2 =
686 build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
687 region_metadata2.schema_version = 2;
688
689 let (resolved_column_metadatas, region_ids) =
690 resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
691 assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
692 assert_eq!(resolved_column_metadatas, new_column_metadatas);
693 }
694}