1use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::v1::SemanticType;
22use common_procedure::{Context as ProcedureContext, ProcedureId, watcher};
23use common_telemetry::{error, warn};
24use datatypes::schema::{ColumnSchema, Schema};
25use futures::future::{join_all, try_join_all};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::metadata::{ColumnMetadata, RegionMetadata};
28use store_api::storage::consts::ReservedColumnId;
29use store_api::storage::{RegionId, TableId};
30use table::metadata::{TableInfo, TableMeta};
31use table::table_name::TableName;
32use table::table_reference::TableReference;
33
34use crate::cache_invalidator::CacheInvalidatorRef;
35use crate::error::{
36 ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
37 MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
38 ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
39 WaitProcedureSnafu,
40};
41use crate::key::TableMetadataManagerRef;
42use crate::metrics;
43use crate::node_manager::NodeManagerRef;
44use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
45use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
46use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
47
48#[derive(Debug, PartialEq, Eq)]
49pub(crate) struct PartialRegionMetadata<'a> {
50 pub(crate) column_metadatas: &'a [ColumnMetadata],
51 pub(crate) primary_key: &'a [u32],
52 pub(crate) table_id: TableId,
53}
54
55impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
56 fn from(region_metadata: &'a RegionMetadata) -> Self {
57 Self {
58 column_metadatas: ®ion_metadata.column_metadatas,
59 primary_key: ®ion_metadata.primary_key,
60 table_id: region_metadata.region_id.table_id(),
61 }
62 }
63}
64
65pub(crate) fn check_column_metadatas_consistent(
75 region_metadatas: &[RegionMetadata],
76) -> Option<Vec<ColumnMetadata>> {
77 let is_column_metadata_consistent = region_metadatas
78 .windows(2)
79 .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
80
81 if !is_column_metadata_consistent {
82 return None;
83 }
84
85 Some(region_metadatas[0].column_metadatas.clone())
86}
87
88pub(crate) fn resolve_column_metadatas_with_metasrv(
96 column_metadatas: &[ColumnMetadata],
97 region_metadatas: &[RegionMetadata],
98) -> Result<Vec<RegionId>> {
99 let is_same_table = region_metadatas
100 .windows(2)
101 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
102
103 ensure!(
104 is_same_table,
105 UnexpectedSnafu {
106 err_msg: "Region metadatas are not from the same table"
107 }
108 );
109
110 let mut regions_ids = vec![];
111 for region_metadata in region_metadatas {
112 if region_metadata.column_metadatas != column_metadatas {
113 check_column_metadata_invariants(column_metadatas, ®ion_metadata.column_metadatas)?;
114 regions_ids.push(region_metadata.region_id);
115 }
116 }
117 Ok(regions_ids)
118}
119
120pub(crate) fn resolve_column_metadatas_with_latest(
128 region_metadatas: &[RegionMetadata],
129) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
130 let is_same_table = region_metadatas
131 .windows(2)
132 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
133
134 ensure!(
135 is_same_table,
136 UnexpectedSnafu {
137 err_msg: "Region metadatas are not from the same table"
138 }
139 );
140
141 let latest_region_metadata = region_metadatas
142 .iter()
143 .max_by_key(|c| c.schema_version)
144 .context(UnexpectedSnafu {
145 err_msg: "All Region metadatas have the same schema version",
146 })?;
147 let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
148
149 let mut region_ids = vec![];
150 for region_metadata in region_metadatas {
151 if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
152 check_column_metadata_invariants(
153 &latest_region_metadata.column_metadatas,
154 ®ion_metadata.column_metadatas,
155 )?;
156 region_ids.push(region_metadata.region_id);
157 }
158 }
159
160 Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
162}
163
164pub(crate) fn build_column_metadata_from_table_info(
172 column_schemas: &[ColumnSchema],
173 primary_key_indexes: &[usize],
174 name_to_ids: &HashMap<String, u32>,
175) -> Result<Vec<ColumnMetadata>> {
176 let primary_names = primary_key_indexes
177 .iter()
178 .map(|i| column_schemas[*i].name.as_str())
179 .collect::<HashSet<_>>();
180
181 column_schemas
182 .iter()
183 .map(|column_schema| {
184 let column_id = *name_to_ids
185 .get(column_schema.name.as_str())
186 .with_context(|| UnexpectedSnafu {
187 err_msg: format!(
188 "Column name {} not found in name_to_ids",
189 column_schema.name
190 ),
191 })?;
192
193 let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
194 SemanticType::Tag
195 } else if column_schema.is_time_index() {
196 SemanticType::Timestamp
197 } else {
198 SemanticType::Field
199 };
200 Ok(ColumnMetadata {
201 column_schema: column_schema.clone(),
202 semantic_type,
203 column_id,
204 })
205 })
206 .collect::<Result<Vec<_>>>()
207}
208
209pub(crate) fn check_column_metadata_invariants(
215 new_column_metadatas: &[ColumnMetadata],
216 column_metadatas: &[ColumnMetadata],
217) -> Result<()> {
218 let new_primary_keys = new_column_metadatas
219 .iter()
220 .filter(|c| c.semantic_type == SemanticType::Tag)
221 .map(|c| (c.column_schema.name.as_str(), c.column_id))
222 .collect::<HashMap<_, _>>();
223
224 let old_primary_keys = column_metadatas
225 .iter()
226 .filter(|c| c.semantic_type == SemanticType::Tag)
227 .map(|c| (c.column_schema.name.as_str(), c.column_id));
228
229 for (name, id) in old_primary_keys {
230 let column_id = new_primary_keys
231 .get(name)
232 .cloned()
233 .context(ColumnNotFoundSnafu {
234 column_name: name,
235 column_id: id,
236 })?;
237
238 ensure!(
239 column_id == id,
240 ColumnIdMismatchSnafu {
241 column_name: name,
242 expected_column_id: id,
243 actual_column_id: column_id,
244 }
245 );
246 }
247
248 let new_ts_column = new_column_metadatas
249 .iter()
250 .find(|c| c.semantic_type == SemanticType::Timestamp)
251 .map(|c| (c.column_schema.name.as_str(), c.column_id))
252 .context(UnexpectedSnafu {
253 err_msg: "Timestamp column not found in new column metadata",
254 })?;
255
256 let old_ts_column = column_metadatas
257 .iter()
258 .find(|c| c.semantic_type == SemanticType::Timestamp)
259 .map(|c| (c.column_schema.name.as_str(), c.column_id))
260 .context(UnexpectedSnafu {
261 err_msg: "Timestamp column not found in column metadata",
262 })?;
263 ensure!(
264 new_ts_column == old_ts_column,
265 TimestampMismatchSnafu {
266 expected_column_name: old_ts_column.0,
267 expected_column_id: old_ts_column.1,
268 actual_column_name: new_ts_column.0,
269 actual_column_id: new_ts_column.1,
270 }
271 );
272
273 Ok(())
274}
275
276pub(crate) fn build_table_meta_from_column_metadatas(
286 table_id: TableId,
287 table_ref: TableReference,
288 table_meta: &TableMeta,
289 name_to_ids: Option<HashMap<String, u32>>,
290 column_metadata: &[ColumnMetadata],
291) -> Result<TableMeta> {
292 let column_in_column_metadata = column_metadata
293 .iter()
294 .map(|c| (c.column_schema.name.as_str(), c))
295 .collect::<HashMap<_, _>>();
296 let column_schemas = table_meta.schema.column_schemas();
297 let primary_key_names = table_meta
298 .primary_key_indices
299 .iter()
300 .map(|i| column_schemas[*i].name.as_str())
301 .collect::<HashSet<_>>();
302 let partition_key_names = table_meta
303 .partition_key_indices
304 .iter()
305 .map(|i| column_schemas[*i].name.as_str())
306 .collect::<HashSet<_>>();
307 ensure!(
308 column_metadata
309 .iter()
310 .any(|c| c.semantic_type == SemanticType::Timestamp),
311 UnexpectedSnafu {
312 err_msg: format!(
313 "Missing table index in column metadata, table: {}, table_id: {}",
314 table_ref, table_id
315 ),
316 }
317 );
318
319 if let Some(name_to_ids) = &name_to_ids {
320 for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
322 let column_in_column_metadata = column_in_column_metadata
323 .get(column_name)
324 .with_context(|| MissingColumnInColumnMetadataSnafu {
325 column_name: column_name.to_string(),
326 table_name: table_ref.to_string(),
327 table_id,
328 })?;
329
330 let column_id = *name_to_ids
331 .get(*column_name)
332 .with_context(|| UnexpectedSnafu {
333 err_msg: format!("column id not found in name_to_ids: {}", column_name),
334 })?;
335 ensure!(
336 column_id == column_in_column_metadata.column_id,
337 MismatchColumnIdSnafu {
338 column_name: column_name.to_string(),
339 column_id,
340 table_name: table_ref.to_string(),
341 table_id,
342 }
343 );
344 }
345 } else {
346 warn!(
347 "`name_to_ids` is not provided, table: {}, table_id: {}",
348 table_ref, table_id
349 );
350 }
351
352 let mut new_raw_table_meta = table_meta.clone();
353 let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
354 let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
355 let value_indices = &mut new_raw_table_meta.value_indices;
356 let mut columns = Vec::with_capacity(column_metadata.len());
357 let column_ids = &mut new_raw_table_meta.column_ids;
358 let next_column_id = &mut new_raw_table_meta.next_column_id;
359
360 column_ids.clear();
361 value_indices.clear();
362 primary_key_indices.clear();
363 partition_key_indices.clear();
364
365 for (idx, col) in column_metadata.iter().enumerate() {
366 if partition_key_names.contains(&col.column_schema.name.as_str()) {
367 partition_key_indices.push(idx);
368 }
369 match col.semantic_type {
370 SemanticType::Tag => {
371 primary_key_indices.push(idx);
372 }
373 SemanticType::Field => {
374 value_indices.push(idx);
375 }
376 SemanticType::Timestamp => {
377 value_indices.push(idx);
378 }
379 }
380
381 columns.push(col.column_schema.clone());
382 column_ids.push(col.column_id);
383 }
384
385 *next_column_id = column_ids
386 .iter()
387 .filter(|id| !ReservedColumnId::is_reserved(**id))
388 .max()
389 .map(|max| max + 1)
390 .unwrap_or(*next_column_id)
391 .max(*next_column_id);
392
393 new_raw_table_meta.schema = Arc::new(Schema::new_with_version(
394 columns,
395 table_meta.schema.version(),
396 ));
397 Ok(new_raw_table_meta)
398}
399
400pub(crate) fn need_update_logical_table_info(
405 table_info: &TableInfo,
406 column_metadatas: &[ColumnMetadata],
407) -> bool {
408 table_info.meta.schema.column_schemas().len() != column_metadatas.len()
409}
410
411pub struct PartialSuccessResult<'a> {
413 pub failed_procedures: Vec<&'a SubprocedureMeta>,
414 pub success_procedures: Vec<&'a SubprocedureMeta>,
415}
416
417pub enum WaitForInflightSubproceduresResult<'a> {
419 Success(Vec<&'a SubprocedureMeta>),
420 PartialSuccess(PartialSuccessResult<'a>),
421}
422
423pub(crate) async fn wait_for_inflight_subprocedures<'a>(
428 procedure_ctx: &ProcedureContext,
429 subprocedures: &'a [SubprocedureMeta],
430 fail_fast: bool,
431) -> Result<WaitForInflightSubproceduresResult<'a>> {
432 let mut receivers = Vec::with_capacity(subprocedures.len());
433 for subprocedure in subprocedures {
434 let procedure_id = subprocedure.procedure_id();
435 let receiver = procedure_ctx
436 .provider
437 .procedure_state_receiver(procedure_id)
438 .await
439 .context(ProcedureStateReceiverSnafu { procedure_id })?
440 .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
441 receivers.push((receiver, subprocedure));
442 }
443
444 let mut tasks = Vec::with_capacity(receivers.len());
445 for (receiver, subprocedure) in receivers.iter_mut() {
446 tasks.push(async move {
447 watcher::wait(receiver).await.inspect_err(|e| {
448 error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
449 })
450 });
451 }
452
453 if fail_fast {
454 try_join_all(tasks).await.context(WaitProcedureSnafu)?;
455 return Ok(WaitForInflightSubproceduresResult::Success(
456 subprocedures.iter().collect(),
457 ));
458 }
459
460 let results = join_all(tasks).await;
462 let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
463 if failed_procedures_num == 0 {
464 return Ok(WaitForInflightSubproceduresResult::Success(
465 subprocedures.iter().collect(),
466 ));
467 }
468 warn!(
469 "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
470 failed_procedures_num,
471 subprocedures.len(),
472 procedure_ctx.procedure_id
473 );
474
475 let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
476 let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
477 for (result, subprocedure) in results.into_iter().zip(subprocedures) {
478 if result.is_err() {
479 failed_procedures.push(subprocedure);
480 } else {
481 success_procedures.push(subprocedure);
482 }
483 }
484
485 Ok(WaitForInflightSubproceduresResult::PartialSuccess(
486 PartialSuccessResult {
487 failed_procedures,
488 success_procedures,
489 },
490 ))
491}
492
493#[derive(Clone)]
494pub struct Context {
495 pub node_manager: NodeManagerRef,
496 pub table_metadata_manager: TableMetadataManagerRef,
497 pub cache_invalidator: CacheInvalidatorRef,
498}
499
500pub struct PhysicalTableMeta {
502 pub procedure_id: ProcedureId,
503 pub table_id: TableId,
504 pub table_name: TableName,
505}
506
507pub struct LogicalTableMeta {
509 pub procedure_id: ProcedureId,
510 pub physical_table_id: TableId,
511 pub physical_table_name: TableName,
512 pub logical_tables: Vec<(TableId, TableName)>,
513}
514
515pub struct ReconcileDatabaseMeta {
517 pub procedure_id: ProcedureId,
518 pub catalog: String,
519 pub schema: String,
520}
521
522pub enum SubprocedureMeta {
524 PhysicalTable(PhysicalTableMeta),
525 LogicalTable(LogicalTableMeta),
526 Database(ReconcileDatabaseMeta),
527}
528
529impl Display for SubprocedureMeta {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 match self {
532 SubprocedureMeta::PhysicalTable(meta) => {
533 write!(
534 f,
535 "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
536 meta.procedure_id, meta.table_id, meta.table_name
537 )
538 }
539 SubprocedureMeta::LogicalTable(meta) => {
540 write!(
541 f,
542 "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
543 meta.procedure_id,
544 meta.physical_table_id,
545 meta.physical_table_name,
546 meta.logical_tables
547 )
548 }
549 SubprocedureMeta::Database(meta) => {
550 write!(
551 f,
552 "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
553 meta.procedure_id, meta.catalog, meta.schema
554 )
555 }
556 }
557 }
558}
559
560impl SubprocedureMeta {
561 pub fn new_logical_table(
563 procedure_id: ProcedureId,
564 physical_table_id: TableId,
565 physical_table_name: TableName,
566 logical_tables: Vec<(TableId, TableName)>,
567 ) -> Self {
568 Self::LogicalTable(LogicalTableMeta {
569 procedure_id,
570 physical_table_id,
571 physical_table_name,
572 logical_tables,
573 })
574 }
575
576 pub fn new_physical_table(
578 procedure_id: ProcedureId,
579 table_id: TableId,
580 table_name: TableName,
581 ) -> Self {
582 Self::PhysicalTable(PhysicalTableMeta {
583 procedure_id,
584 table_id,
585 table_name,
586 })
587 }
588
589 pub fn new_reconcile_database(
591 procedure_id: ProcedureId,
592 catalog: String,
593 schema: String,
594 ) -> Self {
595 Self::Database(ReconcileDatabaseMeta {
596 procedure_id,
597 catalog,
598 schema,
599 })
600 }
601
602 pub fn procedure_id(&self) -> ProcedureId {
604 match self {
605 SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
606 SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
607 SubprocedureMeta::Database(meta) => meta.procedure_id,
608 }
609 }
610
611 pub fn table_num(&self) -> usize {
613 match self {
614 SubprocedureMeta::PhysicalTable(_) => 1,
615 SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
616 SubprocedureMeta::Database(_) => 0,
617 }
618 }
619
620 pub fn database_num(&self) -> usize {
622 match self {
623 SubprocedureMeta::Database(_) => 1,
624 _ => 0,
625 }
626 }
627}
628
629#[derive(Clone, Default)]
631pub struct ReconcileCatalogMetrics {
632 pub succeeded_databases: usize,
633 pub failed_databases: usize,
634}
635
636impl AddAssign for ReconcileCatalogMetrics {
637 fn add_assign(&mut self, other: Self) {
638 self.succeeded_databases += other.succeeded_databases;
639 self.failed_databases += other.failed_databases;
640 }
641}
642
643impl Display for ReconcileCatalogMetrics {
644 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645 write!(
646 f,
647 "succeeded_databases: {}, failed_databases: {}",
648 self.succeeded_databases, self.failed_databases
649 )
650 }
651}
652
653impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
654 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
655 match result {
656 WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
657 succeeded_databases: subprocedures.len(),
658 failed_databases: 0,
659 },
660 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
661 failed_procedures,
662 success_procedures,
663 }) => {
664 let succeeded_databases = success_procedures
665 .iter()
666 .map(|subprocedure| subprocedure.database_num())
667 .sum();
668 let failed_databases = failed_procedures
669 .iter()
670 .map(|subprocedure| subprocedure.database_num())
671 .sum();
672 ReconcileCatalogMetrics {
673 succeeded_databases,
674 failed_databases,
675 }
676 }
677 }
678 }
679}
680
681#[derive(Clone, Default)]
683pub struct ReconcileDatabaseMetrics {
684 pub succeeded_tables: usize,
685 pub failed_tables: usize,
686 pub succeeded_procedures: usize,
687 pub failed_procedures: usize,
688}
689
690impl Display for ReconcileDatabaseMetrics {
691 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692 write!(
693 f,
694 "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}",
695 self.succeeded_tables,
696 self.failed_tables,
697 self.succeeded_procedures,
698 self.failed_procedures
699 )
700 }
701}
702
703impl AddAssign for ReconcileDatabaseMetrics {
704 fn add_assign(&mut self, other: Self) {
705 self.succeeded_tables += other.succeeded_tables;
706 self.failed_tables += other.failed_tables;
707 self.succeeded_procedures += other.succeeded_procedures;
708 self.failed_procedures += other.failed_procedures;
709 }
710}
711
712impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
713 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
714 match result {
715 WaitForInflightSubproceduresResult::Success(subprocedures) => {
716 let table_num = subprocedures
717 .iter()
718 .map(|subprocedure| subprocedure.table_num())
719 .sum();
720 ReconcileDatabaseMetrics {
721 succeeded_procedures: subprocedures.len(),
722 failed_procedures: 0,
723 succeeded_tables: table_num,
724 failed_tables: 0,
725 }
726 }
727 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
728 failed_procedures,
729 success_procedures,
730 }) => {
731 let succeeded_tables = success_procedures
732 .iter()
733 .map(|subprocedure| subprocedure.table_num())
734 .sum();
735 let failed_tables = failed_procedures
736 .iter()
737 .map(|subprocedure| subprocedure.table_num())
738 .sum();
739 ReconcileDatabaseMetrics {
740 succeeded_procedures: success_procedures.len(),
741 failed_procedures: failed_procedures.len(),
742 succeeded_tables,
743 failed_tables,
744 }
745 }
746 }
747 }
748}
749
750#[derive(Clone)]
752pub struct ReconcileLogicalTableMetrics {
753 pub start_time: Instant,
754 pub update_table_info_count: usize,
755 pub create_tables_count: usize,
756 pub column_metadata_consistent_count: usize,
757 pub column_metadata_inconsistent_count: usize,
758}
759
760impl Default for ReconcileLogicalTableMetrics {
761 fn default() -> Self {
762 Self {
763 start_time: Instant::now(),
764 update_table_info_count: 0,
765 create_tables_count: 0,
766 column_metadata_consistent_count: 0,
767 column_metadata_inconsistent_count: 0,
768 }
769 }
770}
771
772const CREATE_TABLES: &str = "create_tables";
773const UPDATE_TABLE_INFO: &str = "update_table_info";
774const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
775const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
776
777impl ReconcileLogicalTableMetrics {
778 pub fn total_table_count(&self) -> usize {
780 self.create_tables_count
781 + self.column_metadata_consistent_count
782 + self.column_metadata_inconsistent_count
783 }
784}
785
786impl Drop for ReconcileLogicalTableMetrics {
787 fn drop(&mut self) {
788 let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
789 metrics::METRIC_META_RECONCILIATION_STATS
790 .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
791 .inc_by(self.create_tables_count as u64);
792 metrics::METRIC_META_RECONCILIATION_STATS
793 .with_label_values(&[
794 procedure_name,
795 metrics::TABLE_TYPE_LOGICAL,
796 UPDATE_TABLE_INFO,
797 ])
798 .inc_by(self.update_table_info_count as u64);
799 metrics::METRIC_META_RECONCILIATION_STATS
800 .with_label_values(&[
801 procedure_name,
802 metrics::TABLE_TYPE_LOGICAL,
803 COLUMN_METADATA_CONSISTENT,
804 ])
805 .inc_by(self.column_metadata_consistent_count as u64);
806 metrics::METRIC_META_RECONCILIATION_STATS
807 .with_label_values(&[
808 procedure_name,
809 metrics::TABLE_TYPE_LOGICAL,
810 COLUMN_METADATA_INCONSISTENT,
811 ])
812 .inc_by(self.column_metadata_inconsistent_count as u64);
813 }
814}
815
816impl Display for ReconcileLogicalTableMetrics {
817 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
818 let elapsed = self.start_time.elapsed();
819 if self.create_tables_count > 0 {
820 write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
821 }
822 if self.update_table_info_count > 0 {
823 write!(
824 f,
825 "update_table_info_count: {}, ",
826 self.update_table_info_count
827 )?;
828 }
829 if self.column_metadata_consistent_count > 0 {
830 write!(
831 f,
832 "column_metadata_consistent_count: {}, ",
833 self.column_metadata_consistent_count
834 )?;
835 }
836 if self.column_metadata_inconsistent_count > 0 {
837 write!(
838 f,
839 "column_metadata_inconsistent_count: {}, ",
840 self.column_metadata_inconsistent_count
841 )?;
842 }
843
844 write!(
845 f,
846 "total_table_count: {}, elapsed: {:?}",
847 self.total_table_count(),
848 elapsed
849 )
850 }
851}
852
853#[derive(Clone, Copy)]
855pub enum ResolveColumnMetadataResult {
856 Consistent,
857 Inconsistent(ResolveStrategy),
858}
859
860impl Display for ResolveColumnMetadataResult {
861 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862 match self {
863 ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
864 ResolveColumnMetadataResult::Inconsistent(strategy) => {
865 let strategy_str = strategy.as_ref();
866 write!(f, "Inconsistent({})", strategy_str)
867 }
868 }
869 }
870}
871
872#[derive(Clone)]
874pub struct ReconcileTableMetrics {
875 pub start_time: Instant,
877 pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
879 pub update_table_info: bool,
881}
882
883impl Drop for ReconcileTableMetrics {
884 fn drop(&mut self) {
885 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
886 match resolve_column_metadata_result {
887 ResolveColumnMetadataResult::Consistent => {
888 metrics::METRIC_META_RECONCILIATION_STATS
889 .with_label_values(&[
890 ReconcileTableProcedure::TYPE_NAME,
891 metrics::TABLE_TYPE_PHYSICAL,
892 COLUMN_METADATA_CONSISTENT,
893 ])
894 .inc();
895 }
896 ResolveColumnMetadataResult::Inconsistent(strategy) => {
897 metrics::METRIC_META_RECONCILIATION_STATS
898 .with_label_values(&[
899 ReconcileTableProcedure::TYPE_NAME,
900 metrics::TABLE_TYPE_PHYSICAL,
901 COLUMN_METADATA_INCONSISTENT,
902 ])
903 .inc();
904 metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
905 .with_label_values(&[strategy.as_ref()])
906 .inc();
907 }
908 }
909 }
910 if self.update_table_info {
911 metrics::METRIC_META_RECONCILIATION_STATS
912 .with_label_values(&[
913 ReconcileTableProcedure::TYPE_NAME,
914 metrics::TABLE_TYPE_PHYSICAL,
915 UPDATE_TABLE_INFO,
916 ])
917 .inc();
918 }
919 }
920}
921
922impl Default for ReconcileTableMetrics {
923 fn default() -> Self {
924 Self {
925 start_time: Instant::now(),
926 resolve_column_metadata_result: None,
927 update_table_info: false,
928 }
929 }
930}
931
932impl Display for ReconcileTableMetrics {
933 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
934 let elapsed = self.start_time.elapsed();
935 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
936 write!(
937 f,
938 "resolve_column_metadata_result: {}, ",
939 resolve_column_metadata_result
940 )?;
941 }
942 write!(
943 f,
944 "update_table_info: {}, elapsed: {:?}",
945 self.update_table_info, elapsed
946 )
947 }
948}
949
950#[cfg(test)]
951mod tests {
952 use std::assert_matches::assert_matches;
953 use std::collections::HashMap;
954 use std::sync::Arc;
955
956 use api::v1::SemanticType;
957 use datatypes::prelude::ConcreteDataType;
958 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
959 use store_api::metadata::ColumnMetadata;
960 use store_api::storage::RegionId;
961 use table::metadata::TableMetaBuilder;
962 use table::table_reference::TableReference;
963
964 use super::*;
965 use crate::ddl::test_util::region_metadata::build_region_metadata;
966 use crate::error::Error;
967 use crate::reconciliation::utils::check_column_metadatas_consistent;
968
969 fn new_test_schema() -> Schema {
970 let column_schemas = vec![
971 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
972 ColumnSchema::new(
973 "ts",
974 ConcreteDataType::timestamp_millisecond_datatype(),
975 false,
976 )
977 .with_time_index(true),
978 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
979 ];
980 SchemaBuilder::try_from(column_schemas)
981 .unwrap()
982 .version(123)
983 .build()
984 .unwrap()
985 }
986
987 fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
988 vec![
989 ColumnMetadata {
990 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
991 semantic_type: SemanticType::Tag,
992 column_id: 0,
993 },
994 ColumnMetadata {
995 column_schema: ColumnSchema::new(
996 "ts",
997 ConcreteDataType::timestamp_millisecond_datatype(),
998 false,
999 )
1000 .with_time_index(true),
1001 semantic_type: SemanticType::Timestamp,
1002 column_id: 1,
1003 },
1004 ColumnMetadata {
1005 column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1006 semantic_type: SemanticType::Field,
1007 column_id: 2,
1008 },
1009 ]
1010 }
1011
1012 fn new_test_raw_table_info() -> TableMeta {
1013 let mut table_meta_builder = TableMetaBuilder::empty();
1014 table_meta_builder
1015 .schema(Arc::new(new_test_schema()))
1016 .primary_key_indices(vec![0])
1017 .partition_key_indices(vec![2])
1018 .next_column_id(4)
1019 .build()
1020 .unwrap()
1021 }
1022
1023 #[test]
1024 fn test_build_table_info_from_column_metadatas_identical() {
1025 let column_metadatas = new_test_column_metadatas();
1026 let table_id = 1;
1027 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1028 let mut table_meta = new_test_raw_table_info();
1029 table_meta.column_ids = vec![0, 1, 2];
1030 let name_to_ids = HashMap::from([
1031 ("col1".to_string(), 0),
1032 ("ts".to_string(), 1),
1033 ("col2".to_string(), 2),
1034 ]);
1035
1036 let new_table_meta = build_table_meta_from_column_metadatas(
1037 table_id,
1038 table_ref,
1039 &table_meta,
1040 Some(name_to_ids),
1041 &column_metadatas,
1042 )
1043 .unwrap();
1044 assert_eq!(new_table_meta, table_meta);
1045 }
1046
1047 #[test]
1048 fn test_build_table_info_from_column_metadatas() {
1049 let mut column_metadatas = new_test_column_metadatas();
1050 column_metadatas.push(ColumnMetadata {
1051 column_schema: ColumnSchema::new(
1052 "__table_id",
1053 ConcreteDataType::string_datatype(),
1054 true,
1055 ),
1056 semantic_type: SemanticType::Tag,
1057 column_id: ReservedColumnId::table_id(),
1058 });
1059
1060 let table_id = 1;
1061 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1062 let table_meta = new_test_raw_table_info();
1063 let name_to_ids = HashMap::from([
1064 ("col1".to_string(), 0),
1065 ("ts".to_string(), 1),
1066 ("col2".to_string(), 2),
1067 ]);
1068
1069 let new_table_meta = build_table_meta_from_column_metadatas(
1070 table_id,
1071 table_ref,
1072 &table_meta,
1073 Some(name_to_ids),
1074 &column_metadatas,
1075 )
1076 .unwrap();
1077
1078 assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1079 assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1080 assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1081 assert_eq!(new_table_meta.schema.timestamp_index(), Some(1));
1082 assert_eq!(
1083 new_table_meta.column_ids,
1084 vec![0, 1, 2, ReservedColumnId::table_id()]
1085 );
1086 assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
1087 }
1088
1089 #[test]
1090 fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1091 let column_metadatas = new_test_column_metadatas();
1092 let table_id = 1;
1093 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1094 let table_meta = new_test_raw_table_info();
1095 let name_to_ids = HashMap::from([
1096 ("col1".to_string(), 0),
1097 ("ts".to_string(), 1),
1098 ("col2".to_string(), 3),
1100 ]);
1101
1102 let err = build_table_meta_from_column_metadatas(
1103 table_id,
1104 table_ref,
1105 &table_meta,
1106 Some(name_to_ids),
1107 &column_metadatas,
1108 )
1109 .unwrap_err();
1110
1111 assert_matches!(err, Error::MismatchColumnId { .. });
1112 }
1113
1114 #[test]
1115 fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1116 let mut column_metadatas = new_test_column_metadatas();
1117 column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1118 let table_id = 1;
1119 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1120 let table_meta = new_test_raw_table_info();
1121 let name_to_ids = HashMap::from([
1122 ("col1".to_string(), 0),
1123 ("ts".to_string(), 1),
1124 ("col2".to_string(), 2),
1125 ]);
1126
1127 let err = build_table_meta_from_column_metadatas(
1128 table_id,
1129 table_ref,
1130 &table_meta,
1131 Some(name_to_ids),
1132 &column_metadatas,
1133 )
1134 .unwrap_err();
1135
1136 assert!(
1137 err.to_string()
1138 .contains("Missing table index in column metadata"),
1139 "err: {}",
1140 err
1141 );
1142 }
1143
1144 #[test]
1145 fn test_build_table_info_from_column_metadatas_with_missing_column() {
1146 let mut column_metadatas = new_test_column_metadatas();
1147 column_metadatas.retain(|c| c.column_id != 0);
1149 let table_id = 1;
1150 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1151 let table_meta = new_test_raw_table_info();
1152 let name_to_ids = HashMap::from([
1153 ("col1".to_string(), 0),
1154 ("ts".to_string(), 1),
1155 ("col2".to_string(), 2),
1156 ]);
1157
1158 let err = build_table_meta_from_column_metadatas(
1159 table_id,
1160 table_ref,
1161 &table_meta,
1162 Some(name_to_ids.clone()),
1163 &column_metadatas,
1164 )
1165 .unwrap_err();
1166 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1167
1168 let mut column_metadatas = new_test_column_metadatas();
1169 column_metadatas.retain(|c| c.column_id != 2);
1171
1172 let err = build_table_meta_from_column_metadatas(
1173 table_id,
1174 table_ref,
1175 &table_meta,
1176 Some(name_to_ids),
1177 &column_metadatas,
1178 )
1179 .unwrap_err();
1180 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1181 }
1182
1183 #[test]
1184 fn test_check_column_metadatas_consistent() {
1185 let column_metadatas = new_test_column_metadatas();
1186 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1187 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1188 let result =
1189 check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1190 assert_eq!(result, column_metadatas);
1191
1192 let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1193 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1194 let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1195 assert!(result.is_none());
1196 }
1197
1198 #[test]
1199 fn test_check_column_metadata_invariants() {
1200 let column_metadatas = new_test_column_metadatas();
1201 let mut new_column_metadatas = column_metadatas.clone();
1202 new_column_metadatas.push(ColumnMetadata {
1203 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1204 semantic_type: SemanticType::Field,
1205 column_id: 3,
1206 });
1207 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1208 }
1209
1210 #[test]
1211 fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1212 let column_metadatas = new_test_column_metadatas();
1213 let mut new_column_metadatas = column_metadatas.clone();
1214 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1215 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1216
1217 let column_metadatas = new_test_column_metadatas();
1218 let mut new_column_metadatas = column_metadatas.clone();
1219 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1220 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1221 }
1222
1223 #[test]
1224 fn test_check_column_metadata_invariants_mismatch_column_id() {
1225 let column_metadatas = new_test_column_metadatas();
1226 let mut new_column_metadatas = column_metadatas.clone();
1227 if let Some(col) = new_column_metadatas
1228 .iter_mut()
1229 .find(|c| c.semantic_type == SemanticType::Timestamp)
1230 {
1231 col.column_id = 100;
1232 }
1233 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1234
1235 let column_metadatas = new_test_column_metadatas();
1236 let mut new_column_metadatas = column_metadatas.clone();
1237 if let Some(col) = new_column_metadatas
1238 .iter_mut()
1239 .find(|c| c.semantic_type == SemanticType::Tag)
1240 {
1241 col.column_id = 100;
1242 }
1243 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1244 }
1245
1246 #[test]
1247 fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1248 let column_metadatas = new_test_column_metadatas();
1249 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1250 let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1251 metasrv_column_metadatas.push(ColumnMetadata {
1252 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1253 semantic_type: SemanticType::Field,
1254 column_id: 3,
1255 });
1256 let result =
1257 resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1258 .unwrap();
1259
1260 assert_eq!(result, vec![RegionId::new(1024, 0)]);
1261 }
1262
1263 #[test]
1264 fn test_resolve_column_metadatas_with_use_latest_strategy() {
1265 let column_metadatas = new_test_column_metadatas();
1266 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1267 let mut new_column_metadatas = column_metadatas.clone();
1268 new_column_metadatas.push(ColumnMetadata {
1269 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1270 semantic_type: SemanticType::Field,
1271 column_id: 3,
1272 });
1273
1274 let mut region_metadata2 =
1275 build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1276 region_metadata2.schema_version = 2;
1277
1278 let (resolved_column_metadatas, region_ids) =
1279 resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1280 assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1281 assert_eq!(resolved_column_metadatas, new_column_metadatas);
1282 }
1283}