1use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::time::Instant;
19
20use api::v1::SemanticType;
21use common_procedure::{Context as ProcedureContext, ProcedureId, watcher};
22use common_telemetry::{error, warn};
23use datatypes::schema::ColumnSchema;
24use futures::future::{join_all, try_join_all};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::metadata::{ColumnMetadata, RegionMetadata};
27use store_api::storage::consts::ReservedColumnId;
28use store_api::storage::{RegionId, TableId};
29use table::metadata::{RawTableInfo, RawTableMeta};
30use table::table_name::TableName;
31use table::table_reference::TableReference;
32
33use crate::cache_invalidator::CacheInvalidatorRef;
34use crate::error::{
35 ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
36 MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
37 ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
38 WaitProcedureSnafu,
39};
40use crate::key::TableMetadataManagerRef;
41use crate::metrics;
42use crate::node_manager::NodeManagerRef;
43use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
44use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
45use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
46
47#[derive(Debug, PartialEq, Eq)]
48pub(crate) struct PartialRegionMetadata<'a> {
49 pub(crate) column_metadatas: &'a [ColumnMetadata],
50 pub(crate) primary_key: &'a [u32],
51 pub(crate) table_id: TableId,
52}
53
54impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
55 fn from(region_metadata: &'a RegionMetadata) -> Self {
56 Self {
57 column_metadatas: ®ion_metadata.column_metadatas,
58 primary_key: ®ion_metadata.primary_key,
59 table_id: region_metadata.region_id.table_id(),
60 }
61 }
62}
63
64pub(crate) fn check_column_metadatas_consistent(
74 region_metadatas: &[RegionMetadata],
75) -> Option<Vec<ColumnMetadata>> {
76 let is_column_metadata_consistent = region_metadatas
77 .windows(2)
78 .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
79
80 if !is_column_metadata_consistent {
81 return None;
82 }
83
84 Some(region_metadatas[0].column_metadatas.clone())
85}
86
87pub(crate) fn resolve_column_metadatas_with_metasrv(
95 column_metadatas: &[ColumnMetadata],
96 region_metadatas: &[RegionMetadata],
97) -> Result<Vec<RegionId>> {
98 let is_same_table = region_metadatas
99 .windows(2)
100 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
101
102 ensure!(
103 is_same_table,
104 UnexpectedSnafu {
105 err_msg: "Region metadatas are not from the same table"
106 }
107 );
108
109 let mut regions_ids = vec![];
110 for region_metadata in region_metadatas {
111 if region_metadata.column_metadatas != column_metadatas {
112 check_column_metadata_invariants(column_metadatas, ®ion_metadata.column_metadatas)?;
113 regions_ids.push(region_metadata.region_id);
114 }
115 }
116 Ok(regions_ids)
117}
118
119pub(crate) fn resolve_column_metadatas_with_latest(
127 region_metadatas: &[RegionMetadata],
128) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
129 let is_same_table = region_metadatas
130 .windows(2)
131 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
132
133 ensure!(
134 is_same_table,
135 UnexpectedSnafu {
136 err_msg: "Region metadatas are not from the same table"
137 }
138 );
139
140 let latest_region_metadata = region_metadatas
141 .iter()
142 .max_by_key(|c| c.schema_version)
143 .context(UnexpectedSnafu {
144 err_msg: "All Region metadatas have the same schema version",
145 })?;
146 let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
147
148 let mut region_ids = vec![];
149 for region_metadata in region_metadatas {
150 if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
151 check_column_metadata_invariants(
152 &latest_region_metadata.column_metadatas,
153 ®ion_metadata.column_metadatas,
154 )?;
155 region_ids.push(region_metadata.region_id);
156 }
157 }
158
159 Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
161}
162
163pub(crate) fn build_column_metadata_from_table_info(
171 column_schemas: &[ColumnSchema],
172 primary_key_indexes: &[usize],
173 name_to_ids: &HashMap<String, u32>,
174) -> Result<Vec<ColumnMetadata>> {
175 let primary_names = primary_key_indexes
176 .iter()
177 .map(|i| column_schemas[*i].name.as_str())
178 .collect::<HashSet<_>>();
179
180 column_schemas
181 .iter()
182 .map(|column_schema| {
183 let column_id = *name_to_ids
184 .get(column_schema.name.as_str())
185 .with_context(|| UnexpectedSnafu {
186 err_msg: format!(
187 "Column name {} not found in name_to_ids",
188 column_schema.name
189 ),
190 })?;
191
192 let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
193 SemanticType::Tag
194 } else if column_schema.is_time_index() {
195 SemanticType::Timestamp
196 } else {
197 SemanticType::Field
198 };
199 Ok(ColumnMetadata {
200 column_schema: column_schema.clone(),
201 semantic_type,
202 column_id,
203 })
204 })
205 .collect::<Result<Vec<_>>>()
206}
207
208pub(crate) fn check_column_metadata_invariants(
214 new_column_metadatas: &[ColumnMetadata],
215 column_metadatas: &[ColumnMetadata],
216) -> Result<()> {
217 let new_primary_keys = new_column_metadatas
218 .iter()
219 .filter(|c| c.semantic_type == SemanticType::Tag)
220 .map(|c| (c.column_schema.name.as_str(), c.column_id))
221 .collect::<HashMap<_, _>>();
222
223 let old_primary_keys = column_metadatas
224 .iter()
225 .filter(|c| c.semantic_type == SemanticType::Tag)
226 .map(|c| (c.column_schema.name.as_str(), c.column_id));
227
228 for (name, id) in old_primary_keys {
229 let column_id = new_primary_keys
230 .get(name)
231 .cloned()
232 .context(ColumnNotFoundSnafu {
233 column_name: name,
234 column_id: id,
235 })?;
236
237 ensure!(
238 column_id == id,
239 ColumnIdMismatchSnafu {
240 column_name: name,
241 expected_column_id: id,
242 actual_column_id: column_id,
243 }
244 );
245 }
246
247 let new_ts_column = new_column_metadatas
248 .iter()
249 .find(|c| c.semantic_type == SemanticType::Timestamp)
250 .map(|c| (c.column_schema.name.as_str(), c.column_id))
251 .context(UnexpectedSnafu {
252 err_msg: "Timestamp column not found in new column metadata",
253 })?;
254
255 let old_ts_column = column_metadatas
256 .iter()
257 .find(|c| c.semantic_type == SemanticType::Timestamp)
258 .map(|c| (c.column_schema.name.as_str(), c.column_id))
259 .context(UnexpectedSnafu {
260 err_msg: "Timestamp column not found in column metadata",
261 })?;
262 ensure!(
263 new_ts_column == old_ts_column,
264 TimestampMismatchSnafu {
265 expected_column_name: old_ts_column.0,
266 expected_column_id: old_ts_column.1,
267 actual_column_name: new_ts_column.0,
268 actual_column_id: new_ts_column.1,
269 }
270 );
271
272 Ok(())
273}
274
275pub(crate) fn build_table_meta_from_column_metadatas(
285 table_id: TableId,
286 table_ref: TableReference,
287 table_meta: &RawTableMeta,
288 name_to_ids: Option<HashMap<String, u32>>,
289 column_metadata: &[ColumnMetadata],
290) -> Result<RawTableMeta> {
291 let column_in_column_metadata = column_metadata
292 .iter()
293 .map(|c| (c.column_schema.name.as_str(), c))
294 .collect::<HashMap<_, _>>();
295 let primary_key_names = table_meta
296 .primary_key_indices
297 .iter()
298 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
299 .collect::<HashSet<_>>();
300 let partition_key_names = table_meta
301 .partition_key_indices
302 .iter()
303 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
304 .collect::<HashSet<_>>();
305 ensure!(
306 column_metadata
307 .iter()
308 .any(|c| c.semantic_type == SemanticType::Timestamp),
309 UnexpectedSnafu {
310 err_msg: format!(
311 "Missing table index in column metadata, table: {}, table_id: {}",
312 table_ref, table_id
313 ),
314 }
315 );
316
317 if let Some(name_to_ids) = &name_to_ids {
318 for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
320 let column_in_column_metadata = column_in_column_metadata
321 .get(column_name)
322 .with_context(|| MissingColumnInColumnMetadataSnafu {
323 column_name: column_name.to_string(),
324 table_name: table_ref.to_string(),
325 table_id,
326 })?;
327
328 let column_id = *name_to_ids
329 .get(*column_name)
330 .with_context(|| UnexpectedSnafu {
331 err_msg: format!("column id not found in name_to_ids: {}", column_name),
332 })?;
333 ensure!(
334 column_id == column_in_column_metadata.column_id,
335 MismatchColumnIdSnafu {
336 column_name: column_name.to_string(),
337 column_id,
338 table_name: table_ref.to_string(),
339 table_id,
340 }
341 );
342 }
343 } else {
344 warn!(
345 "`name_to_ids` is not provided, table: {}, table_id: {}",
346 table_ref, table_id
347 );
348 }
349
350 let mut new_raw_table_meta = table_meta.clone();
351 let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
352 let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
353 let value_indices = &mut new_raw_table_meta.value_indices;
354 let time_index = &mut new_raw_table_meta.schema.timestamp_index;
355 let columns = &mut new_raw_table_meta.schema.column_schemas;
356 let column_ids = &mut new_raw_table_meta.column_ids;
357 let next_column_id = &mut new_raw_table_meta.next_column_id;
358
359 column_ids.clear();
360 value_indices.clear();
361 columns.clear();
362 primary_key_indices.clear();
363 partition_key_indices.clear();
364
365 for (idx, col) in column_metadata.iter().enumerate() {
366 if partition_key_names.contains(&col.column_schema.name.as_str()) {
367 partition_key_indices.push(idx);
368 }
369 match col.semantic_type {
370 SemanticType::Tag => {
371 primary_key_indices.push(idx);
372 }
373 SemanticType::Field => {
374 value_indices.push(idx);
375 }
376 SemanticType::Timestamp => {
377 value_indices.push(idx);
378 *time_index = Some(idx);
379 }
380 }
381
382 columns.push(col.column_schema.clone());
383 column_ids.push(col.column_id);
384 }
385
386 *next_column_id = column_ids
387 .iter()
388 .filter(|id| !ReservedColumnId::is_reserved(**id))
389 .max()
390 .map(|max| max + 1)
391 .unwrap_or(*next_column_id)
392 .max(*next_column_id);
393
394 if let Some(time_index) = *time_index {
395 new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
396 }
397
398 Ok(new_raw_table_meta)
399}
400
401pub(crate) fn need_update_logical_table_info(
406 table_info: &RawTableInfo,
407 column_metadatas: &[ColumnMetadata],
408) -> bool {
409 table_info.meta.schema.column_schemas.len() != column_metadatas.len()
410}
411
412pub struct PartialSuccessResult<'a> {
414 pub failed_procedures: Vec<&'a SubprocedureMeta>,
415 pub success_procedures: Vec<&'a SubprocedureMeta>,
416}
417
418pub enum WaitForInflightSubproceduresResult<'a> {
420 Success(Vec<&'a SubprocedureMeta>),
421 PartialSuccess(PartialSuccessResult<'a>),
422}
423
424pub(crate) async fn wait_for_inflight_subprocedures<'a>(
429 procedure_ctx: &ProcedureContext,
430 subprocedures: &'a [SubprocedureMeta],
431 fail_fast: bool,
432) -> Result<WaitForInflightSubproceduresResult<'a>> {
433 let mut receivers = Vec::with_capacity(subprocedures.len());
434 for subprocedure in subprocedures {
435 let procedure_id = subprocedure.procedure_id();
436 let receiver = procedure_ctx
437 .provider
438 .procedure_state_receiver(procedure_id)
439 .await
440 .context(ProcedureStateReceiverSnafu { procedure_id })?
441 .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
442 receivers.push((receiver, subprocedure));
443 }
444
445 let mut tasks = Vec::with_capacity(receivers.len());
446 for (receiver, subprocedure) in receivers.iter_mut() {
447 tasks.push(async move {
448 watcher::wait(receiver).await.inspect_err(|e| {
449 error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
450 })
451 });
452 }
453
454 if fail_fast {
455 try_join_all(tasks).await.context(WaitProcedureSnafu)?;
456 return Ok(WaitForInflightSubproceduresResult::Success(
457 subprocedures.iter().collect(),
458 ));
459 }
460
461 let results = join_all(tasks).await;
463 let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
464 if failed_procedures_num == 0 {
465 return Ok(WaitForInflightSubproceduresResult::Success(
466 subprocedures.iter().collect(),
467 ));
468 }
469 warn!(
470 "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
471 failed_procedures_num,
472 subprocedures.len(),
473 procedure_ctx.procedure_id
474 );
475
476 let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
477 let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
478 for (result, subprocedure) in results.into_iter().zip(subprocedures) {
479 if result.is_err() {
480 failed_procedures.push(subprocedure);
481 } else {
482 success_procedures.push(subprocedure);
483 }
484 }
485
486 Ok(WaitForInflightSubproceduresResult::PartialSuccess(
487 PartialSuccessResult {
488 failed_procedures,
489 success_procedures,
490 },
491 ))
492}
493
494#[derive(Clone)]
495pub struct Context {
496 pub node_manager: NodeManagerRef,
497 pub table_metadata_manager: TableMetadataManagerRef,
498 pub cache_invalidator: CacheInvalidatorRef,
499}
500
501pub struct PhysicalTableMeta {
503 pub procedure_id: ProcedureId,
504 pub table_id: TableId,
505 pub table_name: TableName,
506}
507
508pub struct LogicalTableMeta {
510 pub procedure_id: ProcedureId,
511 pub physical_table_id: TableId,
512 pub physical_table_name: TableName,
513 pub logical_tables: Vec<(TableId, TableName)>,
514}
515
516pub struct ReconcileDatabaseMeta {
518 pub procedure_id: ProcedureId,
519 pub catalog: String,
520 pub schema: String,
521}
522
523pub enum SubprocedureMeta {
525 PhysicalTable(PhysicalTableMeta),
526 LogicalTable(LogicalTableMeta),
527 Database(ReconcileDatabaseMeta),
528}
529
530impl Display for SubprocedureMeta {
531 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
532 match self {
533 SubprocedureMeta::PhysicalTable(meta) => {
534 write!(
535 f,
536 "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
537 meta.procedure_id, meta.table_id, meta.table_name
538 )
539 }
540 SubprocedureMeta::LogicalTable(meta) => {
541 write!(
542 f,
543 "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
544 meta.procedure_id,
545 meta.physical_table_id,
546 meta.physical_table_name,
547 meta.logical_tables
548 )
549 }
550 SubprocedureMeta::Database(meta) => {
551 write!(
552 f,
553 "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
554 meta.procedure_id, meta.catalog, meta.schema
555 )
556 }
557 }
558 }
559}
560
561impl SubprocedureMeta {
562 pub fn new_logical_table(
564 procedure_id: ProcedureId,
565 physical_table_id: TableId,
566 physical_table_name: TableName,
567 logical_tables: Vec<(TableId, TableName)>,
568 ) -> Self {
569 Self::LogicalTable(LogicalTableMeta {
570 procedure_id,
571 physical_table_id,
572 physical_table_name,
573 logical_tables,
574 })
575 }
576
577 pub fn new_physical_table(
579 procedure_id: ProcedureId,
580 table_id: TableId,
581 table_name: TableName,
582 ) -> Self {
583 Self::PhysicalTable(PhysicalTableMeta {
584 procedure_id,
585 table_id,
586 table_name,
587 })
588 }
589
590 pub fn new_reconcile_database(
592 procedure_id: ProcedureId,
593 catalog: String,
594 schema: String,
595 ) -> Self {
596 Self::Database(ReconcileDatabaseMeta {
597 procedure_id,
598 catalog,
599 schema,
600 })
601 }
602
603 pub fn procedure_id(&self) -> ProcedureId {
605 match self {
606 SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
607 SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
608 SubprocedureMeta::Database(meta) => meta.procedure_id,
609 }
610 }
611
612 pub fn table_num(&self) -> usize {
614 match self {
615 SubprocedureMeta::PhysicalTable(_) => 1,
616 SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
617 SubprocedureMeta::Database(_) => 0,
618 }
619 }
620
621 pub fn database_num(&self) -> usize {
623 match self {
624 SubprocedureMeta::Database(_) => 1,
625 _ => 0,
626 }
627 }
628}
629
630#[derive(Clone, Default)]
632pub struct ReconcileCatalogMetrics {
633 pub succeeded_databases: usize,
634 pub failed_databases: usize,
635}
636
637impl AddAssign for ReconcileCatalogMetrics {
638 fn add_assign(&mut self, other: Self) {
639 self.succeeded_databases += other.succeeded_databases;
640 self.failed_databases += other.failed_databases;
641 }
642}
643
644impl Display for ReconcileCatalogMetrics {
645 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
646 write!(
647 f,
648 "succeeded_databases: {}, failed_databases: {}",
649 self.succeeded_databases, self.failed_databases
650 )
651 }
652}
653
654impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
655 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
656 match result {
657 WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
658 succeeded_databases: subprocedures.len(),
659 failed_databases: 0,
660 },
661 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
662 failed_procedures,
663 success_procedures,
664 }) => {
665 let succeeded_databases = success_procedures
666 .iter()
667 .map(|subprocedure| subprocedure.database_num())
668 .sum();
669 let failed_databases = failed_procedures
670 .iter()
671 .map(|subprocedure| subprocedure.database_num())
672 .sum();
673 ReconcileCatalogMetrics {
674 succeeded_databases,
675 failed_databases,
676 }
677 }
678 }
679 }
680}
681
682#[derive(Clone, Default)]
684pub struct ReconcileDatabaseMetrics {
685 pub succeeded_tables: usize,
686 pub failed_tables: usize,
687 pub succeeded_procedures: usize,
688 pub failed_procedures: usize,
689}
690
691impl Display for ReconcileDatabaseMetrics {
692 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693 write!(
694 f,
695 "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}",
696 self.succeeded_tables,
697 self.failed_tables,
698 self.succeeded_procedures,
699 self.failed_procedures
700 )
701 }
702}
703
704impl AddAssign for ReconcileDatabaseMetrics {
705 fn add_assign(&mut self, other: Self) {
706 self.succeeded_tables += other.succeeded_tables;
707 self.failed_tables += other.failed_tables;
708 self.succeeded_procedures += other.succeeded_procedures;
709 self.failed_procedures += other.failed_procedures;
710 }
711}
712
713impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
714 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
715 match result {
716 WaitForInflightSubproceduresResult::Success(subprocedures) => {
717 let table_num = subprocedures
718 .iter()
719 .map(|subprocedure| subprocedure.table_num())
720 .sum();
721 ReconcileDatabaseMetrics {
722 succeeded_procedures: subprocedures.len(),
723 failed_procedures: 0,
724 succeeded_tables: table_num,
725 failed_tables: 0,
726 }
727 }
728 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
729 failed_procedures,
730 success_procedures,
731 }) => {
732 let succeeded_tables = success_procedures
733 .iter()
734 .map(|subprocedure| subprocedure.table_num())
735 .sum();
736 let failed_tables = failed_procedures
737 .iter()
738 .map(|subprocedure| subprocedure.table_num())
739 .sum();
740 ReconcileDatabaseMetrics {
741 succeeded_procedures: success_procedures.len(),
742 failed_procedures: failed_procedures.len(),
743 succeeded_tables,
744 failed_tables,
745 }
746 }
747 }
748 }
749}
750
751#[derive(Clone)]
753pub struct ReconcileLogicalTableMetrics {
754 pub start_time: Instant,
755 pub update_table_info_count: usize,
756 pub create_tables_count: usize,
757 pub column_metadata_consistent_count: usize,
758 pub column_metadata_inconsistent_count: usize,
759}
760
761impl Default for ReconcileLogicalTableMetrics {
762 fn default() -> Self {
763 Self {
764 start_time: Instant::now(),
765 update_table_info_count: 0,
766 create_tables_count: 0,
767 column_metadata_consistent_count: 0,
768 column_metadata_inconsistent_count: 0,
769 }
770 }
771}
772
773const CREATE_TABLES: &str = "create_tables";
774const UPDATE_TABLE_INFO: &str = "update_table_info";
775const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
776const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
777
778impl ReconcileLogicalTableMetrics {
779 pub fn total_table_count(&self) -> usize {
781 self.create_tables_count
782 + self.column_metadata_consistent_count
783 + self.column_metadata_inconsistent_count
784 }
785}
786
787impl Drop for ReconcileLogicalTableMetrics {
788 fn drop(&mut self) {
789 let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
790 metrics::METRIC_META_RECONCILIATION_STATS
791 .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
792 .inc_by(self.create_tables_count as u64);
793 metrics::METRIC_META_RECONCILIATION_STATS
794 .with_label_values(&[
795 procedure_name,
796 metrics::TABLE_TYPE_LOGICAL,
797 UPDATE_TABLE_INFO,
798 ])
799 .inc_by(self.update_table_info_count as u64);
800 metrics::METRIC_META_RECONCILIATION_STATS
801 .with_label_values(&[
802 procedure_name,
803 metrics::TABLE_TYPE_LOGICAL,
804 COLUMN_METADATA_CONSISTENT,
805 ])
806 .inc_by(self.column_metadata_consistent_count as u64);
807 metrics::METRIC_META_RECONCILIATION_STATS
808 .with_label_values(&[
809 procedure_name,
810 metrics::TABLE_TYPE_LOGICAL,
811 COLUMN_METADATA_INCONSISTENT,
812 ])
813 .inc_by(self.column_metadata_inconsistent_count as u64);
814 }
815}
816
817impl Display for ReconcileLogicalTableMetrics {
818 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
819 let elapsed = self.start_time.elapsed();
820 if self.create_tables_count > 0 {
821 write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
822 }
823 if self.update_table_info_count > 0 {
824 write!(
825 f,
826 "update_table_info_count: {}, ",
827 self.update_table_info_count
828 )?;
829 }
830 if self.column_metadata_consistent_count > 0 {
831 write!(
832 f,
833 "column_metadata_consistent_count: {}, ",
834 self.column_metadata_consistent_count
835 )?;
836 }
837 if self.column_metadata_inconsistent_count > 0 {
838 write!(
839 f,
840 "column_metadata_inconsistent_count: {}, ",
841 self.column_metadata_inconsistent_count
842 )?;
843 }
844
845 write!(
846 f,
847 "total_table_count: {}, elapsed: {:?}",
848 self.total_table_count(),
849 elapsed
850 )
851 }
852}
853
854#[derive(Clone, Copy)]
856pub enum ResolveColumnMetadataResult {
857 Consistent,
858 Inconsistent(ResolveStrategy),
859}
860
861impl Display for ResolveColumnMetadataResult {
862 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
863 match self {
864 ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
865 ResolveColumnMetadataResult::Inconsistent(strategy) => {
866 let strategy_str = strategy.as_ref();
867 write!(f, "Inconsistent({})", strategy_str)
868 }
869 }
870 }
871}
872
873#[derive(Clone)]
875pub struct ReconcileTableMetrics {
876 pub start_time: Instant,
878 pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
880 pub update_table_info: bool,
882}
883
884impl Drop for ReconcileTableMetrics {
885 fn drop(&mut self) {
886 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
887 match resolve_column_metadata_result {
888 ResolveColumnMetadataResult::Consistent => {
889 metrics::METRIC_META_RECONCILIATION_STATS
890 .with_label_values(&[
891 ReconcileTableProcedure::TYPE_NAME,
892 metrics::TABLE_TYPE_PHYSICAL,
893 COLUMN_METADATA_CONSISTENT,
894 ])
895 .inc();
896 }
897 ResolveColumnMetadataResult::Inconsistent(strategy) => {
898 metrics::METRIC_META_RECONCILIATION_STATS
899 .with_label_values(&[
900 ReconcileTableProcedure::TYPE_NAME,
901 metrics::TABLE_TYPE_PHYSICAL,
902 COLUMN_METADATA_INCONSISTENT,
903 ])
904 .inc();
905 metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
906 .with_label_values(&[strategy.as_ref()])
907 .inc();
908 }
909 }
910 }
911 if self.update_table_info {
912 metrics::METRIC_META_RECONCILIATION_STATS
913 .with_label_values(&[
914 ReconcileTableProcedure::TYPE_NAME,
915 metrics::TABLE_TYPE_PHYSICAL,
916 UPDATE_TABLE_INFO,
917 ])
918 .inc();
919 }
920 }
921}
922
923impl Default for ReconcileTableMetrics {
924 fn default() -> Self {
925 Self {
926 start_time: Instant::now(),
927 resolve_column_metadata_result: None,
928 update_table_info: false,
929 }
930 }
931}
932
933impl Display for ReconcileTableMetrics {
934 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
935 let elapsed = self.start_time.elapsed();
936 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
937 write!(
938 f,
939 "resolve_column_metadata_result: {}, ",
940 resolve_column_metadata_result
941 )?;
942 }
943 write!(
944 f,
945 "update_table_info: {}, elapsed: {:?}",
946 self.update_table_info, elapsed
947 )
948 }
949}
950
951#[cfg(test)]
952mod tests {
953 use std::assert_matches::assert_matches;
954 use std::collections::HashMap;
955 use std::sync::Arc;
956
957 use api::v1::SemanticType;
958 use datatypes::prelude::ConcreteDataType;
959 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
960 use store_api::metadata::ColumnMetadata;
961 use store_api::storage::RegionId;
962 use table::metadata::{RawTableMeta, TableMetaBuilder};
963 use table::table_reference::TableReference;
964
965 use super::*;
966 use crate::ddl::test_util::region_metadata::build_region_metadata;
967 use crate::error::Error;
968 use crate::reconciliation::utils::check_column_metadatas_consistent;
969
970 fn new_test_schema() -> Schema {
971 let column_schemas = vec![
972 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
973 ColumnSchema::new(
974 "ts",
975 ConcreteDataType::timestamp_millisecond_datatype(),
976 false,
977 )
978 .with_time_index(true),
979 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
980 ];
981 SchemaBuilder::try_from(column_schemas)
982 .unwrap()
983 .version(123)
984 .build()
985 .unwrap()
986 }
987
988 fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
989 vec![
990 ColumnMetadata {
991 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
992 semantic_type: SemanticType::Tag,
993 column_id: 0,
994 },
995 ColumnMetadata {
996 column_schema: ColumnSchema::new(
997 "ts",
998 ConcreteDataType::timestamp_millisecond_datatype(),
999 false,
1000 )
1001 .with_time_index(true),
1002 semantic_type: SemanticType::Timestamp,
1003 column_id: 1,
1004 },
1005 ColumnMetadata {
1006 column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1007 semantic_type: SemanticType::Field,
1008 column_id: 2,
1009 },
1010 ]
1011 }
1012
1013 fn new_test_raw_table_info() -> RawTableMeta {
1014 let mut table_meta_builder = TableMetaBuilder::empty();
1015 let table_meta = table_meta_builder
1016 .schema(Arc::new(new_test_schema()))
1017 .primary_key_indices(vec![0])
1018 .partition_key_indices(vec![2])
1019 .next_column_id(4)
1020 .build()
1021 .unwrap();
1022
1023 table_meta.into()
1024 }
1025
1026 #[test]
1027 fn test_build_table_info_from_column_metadatas_identical() {
1028 let column_metadatas = new_test_column_metadatas();
1029 let table_id = 1;
1030 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1031 let mut table_meta = new_test_raw_table_info();
1032 table_meta.column_ids = vec![0, 1, 2];
1033 let name_to_ids = HashMap::from([
1034 ("col1".to_string(), 0),
1035 ("ts".to_string(), 1),
1036 ("col2".to_string(), 2),
1037 ]);
1038
1039 let new_table_meta = build_table_meta_from_column_metadatas(
1040 table_id,
1041 table_ref,
1042 &table_meta,
1043 Some(name_to_ids),
1044 &column_metadatas,
1045 )
1046 .unwrap();
1047 assert_eq!(new_table_meta, table_meta);
1048 }
1049
1050 #[test]
1051 fn test_build_table_info_from_column_metadatas() {
1052 let mut column_metadatas = new_test_column_metadatas();
1053 column_metadatas.push(ColumnMetadata {
1054 column_schema: ColumnSchema::new(
1055 "__table_id",
1056 ConcreteDataType::string_datatype(),
1057 true,
1058 ),
1059 semantic_type: SemanticType::Tag,
1060 column_id: ReservedColumnId::table_id(),
1061 });
1062
1063 let table_id = 1;
1064 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1065 let table_meta = new_test_raw_table_info();
1066 let name_to_ids = HashMap::from([
1067 ("col1".to_string(), 0),
1068 ("ts".to_string(), 1),
1069 ("col2".to_string(), 2),
1070 ]);
1071
1072 let new_table_meta = build_table_meta_from_column_metadatas(
1073 table_id,
1074 table_ref,
1075 &table_meta,
1076 Some(name_to_ids),
1077 &column_metadatas,
1078 )
1079 .unwrap();
1080
1081 assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1082 assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1083 assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1084 assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
1085 assert_eq!(
1086 new_table_meta.column_ids,
1087 vec![0, 1, 2, ReservedColumnId::table_id()]
1088 );
1089 assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
1090 }
1091
1092 #[test]
1093 fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1094 let column_metadatas = new_test_column_metadatas();
1095 let table_id = 1;
1096 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1097 let table_meta = new_test_raw_table_info();
1098 let name_to_ids = HashMap::from([
1099 ("col1".to_string(), 0),
1100 ("ts".to_string(), 1),
1101 ("col2".to_string(), 3),
1103 ]);
1104
1105 let err = build_table_meta_from_column_metadatas(
1106 table_id,
1107 table_ref,
1108 &table_meta,
1109 Some(name_to_ids),
1110 &column_metadatas,
1111 )
1112 .unwrap_err();
1113
1114 assert_matches!(err, Error::MismatchColumnId { .. });
1115 }
1116
1117 #[test]
1118 fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1119 let mut column_metadatas = new_test_column_metadatas();
1120 column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1121 let table_id = 1;
1122 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1123 let table_meta = new_test_raw_table_info();
1124 let name_to_ids = HashMap::from([
1125 ("col1".to_string(), 0),
1126 ("ts".to_string(), 1),
1127 ("col2".to_string(), 2),
1128 ]);
1129
1130 let err = build_table_meta_from_column_metadatas(
1131 table_id,
1132 table_ref,
1133 &table_meta,
1134 Some(name_to_ids),
1135 &column_metadatas,
1136 )
1137 .unwrap_err();
1138
1139 assert!(
1140 err.to_string()
1141 .contains("Missing table index in column metadata"),
1142 "err: {}",
1143 err
1144 );
1145 }
1146
1147 #[test]
1148 fn test_build_table_info_from_column_metadatas_with_missing_column() {
1149 let mut column_metadatas = new_test_column_metadatas();
1150 column_metadatas.retain(|c| c.column_id != 0);
1152 let table_id = 1;
1153 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1154 let table_meta = new_test_raw_table_info();
1155 let name_to_ids = HashMap::from([
1156 ("col1".to_string(), 0),
1157 ("ts".to_string(), 1),
1158 ("col2".to_string(), 2),
1159 ]);
1160
1161 let err = build_table_meta_from_column_metadatas(
1162 table_id,
1163 table_ref,
1164 &table_meta,
1165 Some(name_to_ids.clone()),
1166 &column_metadatas,
1167 )
1168 .unwrap_err();
1169 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1170
1171 let mut column_metadatas = new_test_column_metadatas();
1172 column_metadatas.retain(|c| c.column_id != 2);
1174
1175 let err = build_table_meta_from_column_metadatas(
1176 table_id,
1177 table_ref,
1178 &table_meta,
1179 Some(name_to_ids),
1180 &column_metadatas,
1181 )
1182 .unwrap_err();
1183 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1184 }
1185
1186 #[test]
1187 fn test_check_column_metadatas_consistent() {
1188 let column_metadatas = new_test_column_metadatas();
1189 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1190 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1191 let result =
1192 check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1193 assert_eq!(result, column_metadatas);
1194
1195 let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1196 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1197 let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1198 assert!(result.is_none());
1199 }
1200
1201 #[test]
1202 fn test_check_column_metadata_invariants() {
1203 let column_metadatas = new_test_column_metadatas();
1204 let mut new_column_metadatas = column_metadatas.clone();
1205 new_column_metadatas.push(ColumnMetadata {
1206 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1207 semantic_type: SemanticType::Field,
1208 column_id: 3,
1209 });
1210 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1211 }
1212
1213 #[test]
1214 fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1215 let column_metadatas = new_test_column_metadatas();
1216 let mut new_column_metadatas = column_metadatas.clone();
1217 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1218 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1219
1220 let column_metadatas = new_test_column_metadatas();
1221 let mut new_column_metadatas = column_metadatas.clone();
1222 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1223 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1224 }
1225
1226 #[test]
1227 fn test_check_column_metadata_invariants_mismatch_column_id() {
1228 let column_metadatas = new_test_column_metadatas();
1229 let mut new_column_metadatas = column_metadatas.clone();
1230 if let Some(col) = new_column_metadatas
1231 .iter_mut()
1232 .find(|c| c.semantic_type == SemanticType::Timestamp)
1233 {
1234 col.column_id = 100;
1235 }
1236 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1237
1238 let column_metadatas = new_test_column_metadatas();
1239 let mut new_column_metadatas = column_metadatas.clone();
1240 if let Some(col) = new_column_metadatas
1241 .iter_mut()
1242 .find(|c| c.semantic_type == SemanticType::Tag)
1243 {
1244 col.column_id = 100;
1245 }
1246 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1247 }
1248
1249 #[test]
1250 fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1251 let column_metadatas = new_test_column_metadatas();
1252 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1253 let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1254 metasrv_column_metadatas.push(ColumnMetadata {
1255 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1256 semantic_type: SemanticType::Field,
1257 column_id: 3,
1258 });
1259 let result =
1260 resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1261 .unwrap();
1262
1263 assert_eq!(result, vec![RegionId::new(1024, 0)]);
1264 }
1265
1266 #[test]
1267 fn test_resolve_column_metadatas_with_use_latest_strategy() {
1268 let column_metadatas = new_test_column_metadatas();
1269 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1270 let mut new_column_metadatas = column_metadatas.clone();
1271 new_column_metadatas.push(ColumnMetadata {
1272 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1273 semantic_type: SemanticType::Field,
1274 column_id: 3,
1275 });
1276
1277 let mut region_metadata2 =
1278 build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1279 region_metadata2.schema_version = 2;
1280
1281 let (resolved_column_metadatas, region_ids) =
1282 resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1283 assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1284 assert_eq!(resolved_column_metadatas, new_column_metadatas);
1285 }
1286}