1use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::time::Instant;
19
20use api::v1::SemanticType;
21use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
22use common_telemetry::{error, warn};
23use datatypes::schema::ColumnSchema;
24use futures::future::{join_all, try_join_all};
25use snafu::{ensure, OptionExt, ResultExt};
26use store_api::metadata::{ColumnMetadata, RegionMetadata};
27use store_api::storage::{RegionId, TableId};
28use table::metadata::{RawTableInfo, RawTableMeta};
29use table::table_name::TableName;
30use table::table_reference::TableReference;
31
32use crate::cache_invalidator::CacheInvalidatorRef;
33use crate::error::{
34 ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
35 MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
36 ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
37 WaitProcedureSnafu,
38};
39use crate::key::TableMetadataManagerRef;
40use crate::metrics;
41use crate::node_manager::NodeManagerRef;
42use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
43use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
44use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
45
46#[derive(Debug, PartialEq, Eq)]
47pub(crate) struct PartialRegionMetadata<'a> {
48 pub(crate) column_metadatas: &'a [ColumnMetadata],
49 pub(crate) primary_key: &'a [u32],
50 pub(crate) table_id: TableId,
51}
52
53impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
54 fn from(region_metadata: &'a RegionMetadata) -> Self {
55 Self {
56 column_metadatas: ®ion_metadata.column_metadatas,
57 primary_key: ®ion_metadata.primary_key,
58 table_id: region_metadata.region_id.table_id(),
59 }
60 }
61}
62
63pub(crate) fn check_column_metadatas_consistent(
73 region_metadatas: &[RegionMetadata],
74) -> Option<Vec<ColumnMetadata>> {
75 let is_column_metadata_consistent = region_metadatas
76 .windows(2)
77 .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
78
79 if !is_column_metadata_consistent {
80 return None;
81 }
82
83 Some(region_metadatas[0].column_metadatas.clone())
84}
85
86pub(crate) fn resolve_column_metadatas_with_metasrv(
94 column_metadatas: &[ColumnMetadata],
95 region_metadatas: &[RegionMetadata],
96) -> Result<Vec<RegionId>> {
97 let is_same_table = region_metadatas
98 .windows(2)
99 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
100
101 ensure!(
102 is_same_table,
103 UnexpectedSnafu {
104 err_msg: "Region metadatas are not from the same table"
105 }
106 );
107
108 let mut regions_ids = vec![];
109 for region_metadata in region_metadatas {
110 if region_metadata.column_metadatas != column_metadatas {
111 check_column_metadata_invariants(column_metadatas, ®ion_metadata.column_metadatas)?;
112 regions_ids.push(region_metadata.region_id);
113 }
114 }
115 Ok(regions_ids)
116}
117
118pub(crate) fn resolve_column_metadatas_with_latest(
126 region_metadatas: &[RegionMetadata],
127) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
128 let is_same_table = region_metadatas
129 .windows(2)
130 .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
131
132 ensure!(
133 is_same_table,
134 UnexpectedSnafu {
135 err_msg: "Region metadatas are not from the same table"
136 }
137 );
138
139 let latest_region_metadata = region_metadatas
140 .iter()
141 .max_by_key(|c| c.schema_version)
142 .context(UnexpectedSnafu {
143 err_msg: "All Region metadatas have the same schema version",
144 })?;
145 let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
146
147 let mut region_ids = vec![];
148 for region_metadata in region_metadatas {
149 if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
150 check_column_metadata_invariants(
151 &latest_region_metadata.column_metadatas,
152 ®ion_metadata.column_metadatas,
153 )?;
154 region_ids.push(region_metadata.region_id);
155 }
156 }
157
158 Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
160}
161
162pub(crate) fn build_column_metadata_from_table_info(
170 column_schemas: &[ColumnSchema],
171 primary_key_indexes: &[usize],
172 name_to_ids: &HashMap<String, u32>,
173) -> Result<Vec<ColumnMetadata>> {
174 let primary_names = primary_key_indexes
175 .iter()
176 .map(|i| column_schemas[*i].name.as_str())
177 .collect::<HashSet<_>>();
178
179 column_schemas
180 .iter()
181 .map(|column_schema| {
182 let column_id = *name_to_ids
183 .get(column_schema.name.as_str())
184 .with_context(|| UnexpectedSnafu {
185 err_msg: format!(
186 "Column name {} not found in name_to_ids",
187 column_schema.name
188 ),
189 })?;
190
191 let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
192 SemanticType::Tag
193 } else if column_schema.is_time_index() {
194 SemanticType::Timestamp
195 } else {
196 SemanticType::Field
197 };
198 Ok(ColumnMetadata {
199 column_schema: column_schema.clone(),
200 semantic_type,
201 column_id,
202 })
203 })
204 .collect::<Result<Vec<_>>>()
205}
206
207pub(crate) fn check_column_metadata_invariants(
213 new_column_metadatas: &[ColumnMetadata],
214 column_metadatas: &[ColumnMetadata],
215) -> Result<()> {
216 let new_primary_keys = new_column_metadatas
217 .iter()
218 .filter(|c| c.semantic_type == SemanticType::Tag)
219 .map(|c| (c.column_schema.name.as_str(), c.column_id))
220 .collect::<HashMap<_, _>>();
221
222 let old_primary_keys = column_metadatas
223 .iter()
224 .filter(|c| c.semantic_type == SemanticType::Tag)
225 .map(|c| (c.column_schema.name.as_str(), c.column_id));
226
227 for (name, id) in old_primary_keys {
228 let column_id = new_primary_keys
229 .get(name)
230 .cloned()
231 .context(ColumnNotFoundSnafu {
232 column_name: name,
233 column_id: id,
234 })?;
235
236 ensure!(
237 column_id == id,
238 ColumnIdMismatchSnafu {
239 column_name: name,
240 expected_column_id: id,
241 actual_column_id: column_id,
242 }
243 );
244 }
245
246 let new_ts_column = new_column_metadatas
247 .iter()
248 .find(|c| c.semantic_type == SemanticType::Timestamp)
249 .map(|c| (c.column_schema.name.as_str(), c.column_id))
250 .context(UnexpectedSnafu {
251 err_msg: "Timestamp column not found in new column metadata",
252 })?;
253
254 let old_ts_column = column_metadatas
255 .iter()
256 .find(|c| c.semantic_type == SemanticType::Timestamp)
257 .map(|c| (c.column_schema.name.as_str(), c.column_id))
258 .context(UnexpectedSnafu {
259 err_msg: "Timestamp column not found in column metadata",
260 })?;
261 ensure!(
262 new_ts_column == old_ts_column,
263 TimestampMismatchSnafu {
264 expected_column_name: old_ts_column.0,
265 expected_column_id: old_ts_column.1,
266 actual_column_name: new_ts_column.0,
267 actual_column_id: new_ts_column.1,
268 }
269 );
270
271 Ok(())
272}
273
274pub(crate) fn build_table_meta_from_column_metadatas(
284 table_id: TableId,
285 table_ref: TableReference,
286 table_meta: &RawTableMeta,
287 name_to_ids: Option<HashMap<String, u32>>,
288 column_metadata: &[ColumnMetadata],
289) -> Result<RawTableMeta> {
290 let column_in_column_metadata = column_metadata
291 .iter()
292 .map(|c| (c.column_schema.name.as_str(), c))
293 .collect::<HashMap<_, _>>();
294 let primary_key_names = table_meta
295 .primary_key_indices
296 .iter()
297 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
298 .collect::<HashSet<_>>();
299 let partition_key_names = table_meta
300 .partition_key_indices
301 .iter()
302 .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
303 .collect::<HashSet<_>>();
304 ensure!(
305 column_metadata
306 .iter()
307 .any(|c| c.semantic_type == SemanticType::Timestamp),
308 UnexpectedSnafu {
309 err_msg: format!(
310 "Missing table index in column metadata, table: {}, table_id: {}",
311 table_ref, table_id
312 ),
313 }
314 );
315
316 if let Some(name_to_ids) = &name_to_ids {
317 for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
319 let column_in_column_metadata = column_in_column_metadata
320 .get(column_name)
321 .with_context(|| MissingColumnInColumnMetadataSnafu {
322 column_name: column_name.to_string(),
323 table_name: table_ref.to_string(),
324 table_id,
325 })?;
326
327 let column_id = *name_to_ids
328 .get(*column_name)
329 .with_context(|| UnexpectedSnafu {
330 err_msg: format!("column id not found in name_to_ids: {}", column_name),
331 })?;
332 ensure!(
333 column_id == column_in_column_metadata.column_id,
334 MismatchColumnIdSnafu {
335 column_name: column_name.to_string(),
336 column_id,
337 table_name: table_ref.to_string(),
338 table_id,
339 }
340 );
341 }
342 } else {
343 warn!(
344 "`name_to_ids` is not provided, table: {}, table_id: {}",
345 table_ref, table_id
346 );
347 }
348
349 let mut new_raw_table_meta = table_meta.clone();
350 let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
351 let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
352 let value_indices = &mut new_raw_table_meta.value_indices;
353 let time_index = &mut new_raw_table_meta.schema.timestamp_index;
354 let columns = &mut new_raw_table_meta.schema.column_schemas;
355 let column_ids = &mut new_raw_table_meta.column_ids;
356 let next_column_id = &mut new_raw_table_meta.next_column_id;
357
358 column_ids.clear();
359 value_indices.clear();
360 columns.clear();
361 primary_key_indices.clear();
362 partition_key_indices.clear();
363
364 for (idx, col) in column_metadata.iter().enumerate() {
365 if partition_key_names.contains(&col.column_schema.name.as_str()) {
366 partition_key_indices.push(idx);
367 }
368 match col.semantic_type {
369 SemanticType::Tag => {
370 primary_key_indices.push(idx);
371 }
372 SemanticType::Field => {
373 value_indices.push(idx);
374 }
375 SemanticType::Timestamp => {
376 value_indices.push(idx);
377 *time_index = Some(idx);
378 }
379 }
380
381 columns.push(col.column_schema.clone());
382 column_ids.push(col.column_id);
383 }
384
385 *next_column_id = column_ids
386 .iter()
387 .max()
388 .map(|max| max + 1)
389 .unwrap_or(*next_column_id)
390 .max(*next_column_id);
391
392 if let Some(time_index) = *time_index {
393 new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
394 }
395
396 Ok(new_raw_table_meta)
397}
398
399pub(crate) fn need_update_logical_table_info(
404 table_info: &RawTableInfo,
405 column_metadatas: &[ColumnMetadata],
406) -> bool {
407 table_info.meta.schema.column_schemas.len() != column_metadatas.len()
408}
409
410pub struct PartialSuccessResult<'a> {
412 pub failed_procedures: Vec<&'a SubprocedureMeta>,
413 pub success_procedures: Vec<&'a SubprocedureMeta>,
414}
415
416pub enum WaitForInflightSubproceduresResult<'a> {
418 Success(Vec<&'a SubprocedureMeta>),
419 PartialSuccess(PartialSuccessResult<'a>),
420}
421
422pub(crate) async fn wait_for_inflight_subprocedures<'a>(
427 procedure_ctx: &ProcedureContext,
428 subprocedures: &'a [SubprocedureMeta],
429 fail_fast: bool,
430) -> Result<WaitForInflightSubproceduresResult<'a>> {
431 let mut receivers = Vec::with_capacity(subprocedures.len());
432 for subprocedure in subprocedures {
433 let procedure_id = subprocedure.procedure_id();
434 let receiver = procedure_ctx
435 .provider
436 .procedure_state_receiver(procedure_id)
437 .await
438 .context(ProcedureStateReceiverSnafu { procedure_id })?
439 .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
440 receivers.push((receiver, subprocedure));
441 }
442
443 let mut tasks = Vec::with_capacity(receivers.len());
444 for (receiver, subprocedure) in receivers.iter_mut() {
445 tasks.push(async move {
446 watcher::wait(receiver).await.inspect_err(|e| {
447 error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
448 })
449 });
450 }
451
452 if fail_fast {
453 try_join_all(tasks).await.context(WaitProcedureSnafu)?;
454 return Ok(WaitForInflightSubproceduresResult::Success(
455 subprocedures.iter().collect(),
456 ));
457 }
458
459 let results = join_all(tasks).await;
461 let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
462 if failed_procedures_num == 0 {
463 return Ok(WaitForInflightSubproceduresResult::Success(
464 subprocedures.iter().collect(),
465 ));
466 }
467 warn!(
468 "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
469 failed_procedures_num,
470 subprocedures.len(),
471 procedure_ctx.procedure_id
472 );
473
474 let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
475 let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
476 for (result, subprocedure) in results.into_iter().zip(subprocedures) {
477 if result.is_err() {
478 failed_procedures.push(subprocedure);
479 } else {
480 success_procedures.push(subprocedure);
481 }
482 }
483
484 Ok(WaitForInflightSubproceduresResult::PartialSuccess(
485 PartialSuccessResult {
486 failed_procedures,
487 success_procedures,
488 },
489 ))
490}
491
492#[derive(Clone)]
493pub struct Context {
494 pub node_manager: NodeManagerRef,
495 pub table_metadata_manager: TableMetadataManagerRef,
496 pub cache_invalidator: CacheInvalidatorRef,
497}
498
499pub struct PhysicalTableMeta {
501 pub procedure_id: ProcedureId,
502 pub table_id: TableId,
503 pub table_name: TableName,
504}
505
506pub struct LogicalTableMeta {
508 pub procedure_id: ProcedureId,
509 pub physical_table_id: TableId,
510 pub physical_table_name: TableName,
511 pub logical_tables: Vec<(TableId, TableName)>,
512}
513
514pub struct ReconcileDatabaseMeta {
516 pub procedure_id: ProcedureId,
517 pub catalog: String,
518 pub schema: String,
519}
520
521pub enum SubprocedureMeta {
523 PhysicalTable(PhysicalTableMeta),
524 LogicalTable(LogicalTableMeta),
525 Database(ReconcileDatabaseMeta),
526}
527
528impl Display for SubprocedureMeta {
529 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
530 match self {
531 SubprocedureMeta::PhysicalTable(meta) => {
532 write!(
533 f,
534 "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
535 meta.procedure_id, meta.table_id, meta.table_name
536 )
537 }
538 SubprocedureMeta::LogicalTable(meta) => {
539 write!(
540 f,
541 "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
542 meta.procedure_id, meta.physical_table_id, meta.physical_table_name, meta.logical_tables
543 )
544 }
545 SubprocedureMeta::Database(meta) => {
546 write!(
547 f,
548 "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
549 meta.procedure_id, meta.catalog, meta.schema
550 )
551 }
552 }
553 }
554}
555
556impl SubprocedureMeta {
557 pub fn new_logical_table(
559 procedure_id: ProcedureId,
560 physical_table_id: TableId,
561 physical_table_name: TableName,
562 logical_tables: Vec<(TableId, TableName)>,
563 ) -> Self {
564 Self::LogicalTable(LogicalTableMeta {
565 procedure_id,
566 physical_table_id,
567 physical_table_name,
568 logical_tables,
569 })
570 }
571
572 pub fn new_physical_table(
574 procedure_id: ProcedureId,
575 table_id: TableId,
576 table_name: TableName,
577 ) -> Self {
578 Self::PhysicalTable(PhysicalTableMeta {
579 procedure_id,
580 table_id,
581 table_name,
582 })
583 }
584
585 pub fn new_reconcile_database(
587 procedure_id: ProcedureId,
588 catalog: String,
589 schema: String,
590 ) -> Self {
591 Self::Database(ReconcileDatabaseMeta {
592 procedure_id,
593 catalog,
594 schema,
595 })
596 }
597
598 pub fn procedure_id(&self) -> ProcedureId {
600 match self {
601 SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
602 SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
603 SubprocedureMeta::Database(meta) => meta.procedure_id,
604 }
605 }
606
607 pub fn table_num(&self) -> usize {
609 match self {
610 SubprocedureMeta::PhysicalTable(_) => 1,
611 SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
612 SubprocedureMeta::Database(_) => 0,
613 }
614 }
615
616 pub fn database_num(&self) -> usize {
618 match self {
619 SubprocedureMeta::Database(_) => 1,
620 _ => 0,
621 }
622 }
623}
624
625#[derive(Clone, Default)]
627pub struct ReconcileCatalogMetrics {
628 pub succeeded_databases: usize,
629 pub failed_databases: usize,
630}
631
632impl AddAssign for ReconcileCatalogMetrics {
633 fn add_assign(&mut self, other: Self) {
634 self.succeeded_databases += other.succeeded_databases;
635 self.failed_databases += other.failed_databases;
636 }
637}
638
639impl Display for ReconcileCatalogMetrics {
640 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641 write!(
642 f,
643 "succeeded_databases: {}, failed_databases: {}",
644 self.succeeded_databases, self.failed_databases
645 )
646 }
647}
648
649impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
650 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
651 match result {
652 WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
653 succeeded_databases: subprocedures.len(),
654 failed_databases: 0,
655 },
656 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
657 failed_procedures,
658 success_procedures,
659 }) => {
660 let succeeded_databases = success_procedures
661 .iter()
662 .map(|subprocedure| subprocedure.database_num())
663 .sum();
664 let failed_databases = failed_procedures
665 .iter()
666 .map(|subprocedure| subprocedure.database_num())
667 .sum();
668 ReconcileCatalogMetrics {
669 succeeded_databases,
670 failed_databases,
671 }
672 }
673 }
674 }
675}
676
677#[derive(Clone, Default)]
679pub struct ReconcileDatabaseMetrics {
680 pub succeeded_tables: usize,
681 pub failed_tables: usize,
682 pub succeeded_procedures: usize,
683 pub failed_procedures: usize,
684}
685
686impl Display for ReconcileDatabaseMetrics {
687 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688 write!(f, "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}", self.succeeded_tables, self.failed_tables, self.succeeded_procedures, self.failed_procedures)
689 }
690}
691
692impl AddAssign for ReconcileDatabaseMetrics {
693 fn add_assign(&mut self, other: Self) {
694 self.succeeded_tables += other.succeeded_tables;
695 self.failed_tables += other.failed_tables;
696 self.succeeded_procedures += other.succeeded_procedures;
697 self.failed_procedures += other.failed_procedures;
698 }
699}
700
701impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
702 fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
703 match result {
704 WaitForInflightSubproceduresResult::Success(subprocedures) => {
705 let table_num = subprocedures
706 .iter()
707 .map(|subprocedure| subprocedure.table_num())
708 .sum();
709 ReconcileDatabaseMetrics {
710 succeeded_procedures: subprocedures.len(),
711 failed_procedures: 0,
712 succeeded_tables: table_num,
713 failed_tables: 0,
714 }
715 }
716 WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
717 failed_procedures,
718 success_procedures,
719 }) => {
720 let succeeded_tables = success_procedures
721 .iter()
722 .map(|subprocedure| subprocedure.table_num())
723 .sum();
724 let failed_tables = failed_procedures
725 .iter()
726 .map(|subprocedure| subprocedure.table_num())
727 .sum();
728 ReconcileDatabaseMetrics {
729 succeeded_procedures: success_procedures.len(),
730 failed_procedures: failed_procedures.len(),
731 succeeded_tables,
732 failed_tables,
733 }
734 }
735 }
736 }
737}
738
739#[derive(Clone)]
741pub struct ReconcileLogicalTableMetrics {
742 pub start_time: Instant,
743 pub update_table_info_count: usize,
744 pub create_tables_count: usize,
745 pub column_metadata_consistent_count: usize,
746 pub column_metadata_inconsistent_count: usize,
747}
748
749impl Default for ReconcileLogicalTableMetrics {
750 fn default() -> Self {
751 Self {
752 start_time: Instant::now(),
753 update_table_info_count: 0,
754 create_tables_count: 0,
755 column_metadata_consistent_count: 0,
756 column_metadata_inconsistent_count: 0,
757 }
758 }
759}
760
761const CREATE_TABLES: &str = "create_tables";
762const UPDATE_TABLE_INFO: &str = "update_table_info";
763const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
764const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
765
766impl ReconcileLogicalTableMetrics {
767 pub fn total_table_count(&self) -> usize {
769 self.create_tables_count
770 + self.column_metadata_consistent_count
771 + self.column_metadata_inconsistent_count
772 }
773}
774
775impl Drop for ReconcileLogicalTableMetrics {
776 fn drop(&mut self) {
777 let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
778 metrics::METRIC_META_RECONCILIATION_STATS
779 .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
780 .inc_by(self.create_tables_count as u64);
781 metrics::METRIC_META_RECONCILIATION_STATS
782 .with_label_values(&[
783 procedure_name,
784 metrics::TABLE_TYPE_LOGICAL,
785 UPDATE_TABLE_INFO,
786 ])
787 .inc_by(self.update_table_info_count as u64);
788 metrics::METRIC_META_RECONCILIATION_STATS
789 .with_label_values(&[
790 procedure_name,
791 metrics::TABLE_TYPE_LOGICAL,
792 COLUMN_METADATA_CONSISTENT,
793 ])
794 .inc_by(self.column_metadata_consistent_count as u64);
795 metrics::METRIC_META_RECONCILIATION_STATS
796 .with_label_values(&[
797 procedure_name,
798 metrics::TABLE_TYPE_LOGICAL,
799 COLUMN_METADATA_INCONSISTENT,
800 ])
801 .inc_by(self.column_metadata_inconsistent_count as u64);
802 }
803}
804
805impl Display for ReconcileLogicalTableMetrics {
806 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
807 let elapsed = self.start_time.elapsed();
808 if self.create_tables_count > 0 {
809 write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
810 }
811 if self.update_table_info_count > 0 {
812 write!(
813 f,
814 "update_table_info_count: {}, ",
815 self.update_table_info_count
816 )?;
817 }
818 if self.column_metadata_consistent_count > 0 {
819 write!(
820 f,
821 "column_metadata_consistent_count: {}, ",
822 self.column_metadata_consistent_count
823 )?;
824 }
825 if self.column_metadata_inconsistent_count > 0 {
826 write!(
827 f,
828 "column_metadata_inconsistent_count: {}, ",
829 self.column_metadata_inconsistent_count
830 )?;
831 }
832
833 write!(
834 f,
835 "total_table_count: {}, elapsed: {:?}",
836 self.total_table_count(),
837 elapsed
838 )
839 }
840}
841
842#[derive(Clone, Copy)]
844pub enum ResolveColumnMetadataResult {
845 Consistent,
846 Inconsistent(ResolveStrategy),
847}
848
849impl Display for ResolveColumnMetadataResult {
850 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
851 match self {
852 ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
853 ResolveColumnMetadataResult::Inconsistent(strategy) => {
854 let strategy_str = strategy.as_ref();
855 write!(f, "Inconsistent({})", strategy_str)
856 }
857 }
858 }
859}
860
861#[derive(Clone)]
863pub struct ReconcileTableMetrics {
864 pub start_time: Instant,
866 pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
868 pub update_table_info: bool,
870}
871
872impl Drop for ReconcileTableMetrics {
873 fn drop(&mut self) {
874 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
875 match resolve_column_metadata_result {
876 ResolveColumnMetadataResult::Consistent => {
877 metrics::METRIC_META_RECONCILIATION_STATS
878 .with_label_values(&[
879 ReconcileTableProcedure::TYPE_NAME,
880 metrics::TABLE_TYPE_PHYSICAL,
881 COLUMN_METADATA_CONSISTENT,
882 ])
883 .inc();
884 }
885 ResolveColumnMetadataResult::Inconsistent(strategy) => {
886 metrics::METRIC_META_RECONCILIATION_STATS
887 .with_label_values(&[
888 ReconcileTableProcedure::TYPE_NAME,
889 metrics::TABLE_TYPE_PHYSICAL,
890 COLUMN_METADATA_INCONSISTENT,
891 ])
892 .inc();
893 metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
894 .with_label_values(&[strategy.as_ref()])
895 .inc();
896 }
897 }
898 }
899 if self.update_table_info {
900 metrics::METRIC_META_RECONCILIATION_STATS
901 .with_label_values(&[
902 ReconcileTableProcedure::TYPE_NAME,
903 metrics::TABLE_TYPE_PHYSICAL,
904 UPDATE_TABLE_INFO,
905 ])
906 .inc();
907 }
908 }
909}
910
911impl Default for ReconcileTableMetrics {
912 fn default() -> Self {
913 Self {
914 start_time: Instant::now(),
915 resolve_column_metadata_result: None,
916 update_table_info: false,
917 }
918 }
919}
920
921impl Display for ReconcileTableMetrics {
922 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
923 let elapsed = self.start_time.elapsed();
924 if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
925 write!(
926 f,
927 "resolve_column_metadata_result: {}, ",
928 resolve_column_metadata_result
929 )?;
930 }
931 write!(
932 f,
933 "update_table_info: {}, elapsed: {:?}",
934 self.update_table_info, elapsed
935 )
936 }
937}
938
939#[cfg(test)]
940mod tests {
941 use std::assert_matches::assert_matches;
942 use std::collections::HashMap;
943 use std::sync::Arc;
944
945 use api::v1::SemanticType;
946 use datatypes::prelude::ConcreteDataType;
947 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
948 use store_api::metadata::ColumnMetadata;
949 use store_api::storage::RegionId;
950 use table::metadata::{RawTableMeta, TableMetaBuilder};
951 use table::table_reference::TableReference;
952
953 use super::*;
954 use crate::ddl::test_util::region_metadata::build_region_metadata;
955 use crate::error::Error;
956 use crate::reconciliation::utils::check_column_metadatas_consistent;
957
958 fn new_test_schema() -> Schema {
959 let column_schemas = vec![
960 ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
961 ColumnSchema::new(
962 "ts",
963 ConcreteDataType::timestamp_millisecond_datatype(),
964 false,
965 )
966 .with_time_index(true),
967 ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
968 ];
969 SchemaBuilder::try_from(column_schemas)
970 .unwrap()
971 .version(123)
972 .build()
973 .unwrap()
974 }
975
976 fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
977 vec![
978 ColumnMetadata {
979 column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
980 semantic_type: SemanticType::Tag,
981 column_id: 0,
982 },
983 ColumnMetadata {
984 column_schema: ColumnSchema::new(
985 "ts",
986 ConcreteDataType::timestamp_millisecond_datatype(),
987 false,
988 )
989 .with_time_index(true),
990 semantic_type: SemanticType::Timestamp,
991 column_id: 1,
992 },
993 ColumnMetadata {
994 column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
995 semantic_type: SemanticType::Field,
996 column_id: 2,
997 },
998 ]
999 }
1000
1001 fn new_test_raw_table_info() -> RawTableMeta {
1002 let mut table_meta_builder = TableMetaBuilder::empty();
1003 let table_meta = table_meta_builder
1004 .schema(Arc::new(new_test_schema()))
1005 .primary_key_indices(vec![0])
1006 .partition_key_indices(vec![2])
1007 .next_column_id(4)
1008 .build()
1009 .unwrap();
1010
1011 table_meta.into()
1012 }
1013
1014 #[test]
1015 fn test_build_table_info_from_column_metadatas_identical() {
1016 let column_metadatas = new_test_column_metadatas();
1017 let table_id = 1;
1018 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1019 let mut table_meta = new_test_raw_table_info();
1020 table_meta.column_ids = vec![0, 1, 2];
1021 let name_to_ids = HashMap::from([
1022 ("col1".to_string(), 0),
1023 ("ts".to_string(), 1),
1024 ("col2".to_string(), 2),
1025 ]);
1026
1027 let new_table_meta = build_table_meta_from_column_metadatas(
1028 table_id,
1029 table_ref,
1030 &table_meta,
1031 Some(name_to_ids),
1032 &column_metadatas,
1033 )
1034 .unwrap();
1035 assert_eq!(new_table_meta, table_meta);
1036 }
1037
1038 #[test]
1039 fn test_build_table_info_from_column_metadatas() {
1040 let mut column_metadatas = new_test_column_metadatas();
1041 column_metadatas.push(ColumnMetadata {
1042 column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
1043 semantic_type: SemanticType::Tag,
1044 column_id: 3,
1045 });
1046
1047 let table_id = 1;
1048 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1049 let table_meta = new_test_raw_table_info();
1050 let name_to_ids = HashMap::from([
1051 ("col1".to_string(), 0),
1052 ("ts".to_string(), 1),
1053 ("col2".to_string(), 2),
1054 ]);
1055
1056 let new_table_meta = build_table_meta_from_column_metadatas(
1057 table_id,
1058 table_ref,
1059 &table_meta,
1060 Some(name_to_ids),
1061 &column_metadatas,
1062 )
1063 .unwrap();
1064
1065 assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1066 assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1067 assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1068 assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
1069 assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
1070 assert_eq!(new_table_meta.next_column_id, 4);
1071 }
1072
1073 #[test]
1074 fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1075 let column_metadatas = new_test_column_metadatas();
1076 let table_id = 1;
1077 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1078 let table_meta = new_test_raw_table_info();
1079 let name_to_ids = HashMap::from([
1080 ("col1".to_string(), 0),
1081 ("ts".to_string(), 1),
1082 ("col2".to_string(), 3),
1084 ]);
1085
1086 let err = build_table_meta_from_column_metadatas(
1087 table_id,
1088 table_ref,
1089 &table_meta,
1090 Some(name_to_ids),
1091 &column_metadatas,
1092 )
1093 .unwrap_err();
1094
1095 assert_matches!(err, Error::MismatchColumnId { .. });
1096 }
1097
1098 #[test]
1099 fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1100 let mut column_metadatas = new_test_column_metadatas();
1101 column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1102 let table_id = 1;
1103 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1104 let table_meta = new_test_raw_table_info();
1105 let name_to_ids = HashMap::from([
1106 ("col1".to_string(), 0),
1107 ("ts".to_string(), 1),
1108 ("col2".to_string(), 2),
1109 ]);
1110
1111 let err = build_table_meta_from_column_metadatas(
1112 table_id,
1113 table_ref,
1114 &table_meta,
1115 Some(name_to_ids),
1116 &column_metadatas,
1117 )
1118 .unwrap_err();
1119
1120 assert!(
1121 err.to_string()
1122 .contains("Missing table index in column metadata"),
1123 "err: {}",
1124 err
1125 );
1126 }
1127
1128 #[test]
1129 fn test_build_table_info_from_column_metadatas_with_missing_column() {
1130 let mut column_metadatas = new_test_column_metadatas();
1131 column_metadatas.retain(|c| c.column_id != 0);
1133 let table_id = 1;
1134 let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1135 let table_meta = new_test_raw_table_info();
1136 let name_to_ids = HashMap::from([
1137 ("col1".to_string(), 0),
1138 ("ts".to_string(), 1),
1139 ("col2".to_string(), 2),
1140 ]);
1141
1142 let err = build_table_meta_from_column_metadatas(
1143 table_id,
1144 table_ref,
1145 &table_meta,
1146 Some(name_to_ids.clone()),
1147 &column_metadatas,
1148 )
1149 .unwrap_err();
1150 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1151
1152 let mut column_metadatas = new_test_column_metadatas();
1153 column_metadatas.retain(|c| c.column_id != 2);
1155
1156 let err = build_table_meta_from_column_metadatas(
1157 table_id,
1158 table_ref,
1159 &table_meta,
1160 Some(name_to_ids),
1161 &column_metadatas,
1162 )
1163 .unwrap_err();
1164 assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1165 }
1166
1167 #[test]
1168 fn test_check_column_metadatas_consistent() {
1169 let column_metadatas = new_test_column_metadatas();
1170 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1171 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1172 let result =
1173 check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1174 assert_eq!(result, column_metadatas);
1175
1176 let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1177 let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1178 let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1179 assert!(result.is_none());
1180 }
1181
1182 #[test]
1183 fn test_check_column_metadata_invariants() {
1184 let column_metadatas = new_test_column_metadatas();
1185 let mut new_column_metadatas = column_metadatas.clone();
1186 new_column_metadatas.push(ColumnMetadata {
1187 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1188 semantic_type: SemanticType::Field,
1189 column_id: 3,
1190 });
1191 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1192 }
1193
1194 #[test]
1195 fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1196 let column_metadatas = new_test_column_metadatas();
1197 let mut new_column_metadatas = column_metadatas.clone();
1198 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1199 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1200
1201 let column_metadatas = new_test_column_metadatas();
1202 let mut new_column_metadatas = column_metadatas.clone();
1203 new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1204 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1205 }
1206
1207 #[test]
1208 fn test_check_column_metadata_invariants_mismatch_column_id() {
1209 let column_metadatas = new_test_column_metadatas();
1210 let mut new_column_metadatas = column_metadatas.clone();
1211 if let Some(col) = new_column_metadatas
1212 .iter_mut()
1213 .find(|c| c.semantic_type == SemanticType::Timestamp)
1214 {
1215 col.column_id = 100;
1216 }
1217 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1218
1219 let column_metadatas = new_test_column_metadatas();
1220 let mut new_column_metadatas = column_metadatas.clone();
1221 if let Some(col) = new_column_metadatas
1222 .iter_mut()
1223 .find(|c| c.semantic_type == SemanticType::Tag)
1224 {
1225 col.column_id = 100;
1226 }
1227 check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1228 }
1229
1230 #[test]
1231 fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1232 let column_metadatas = new_test_column_metadatas();
1233 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1234 let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1235 metasrv_column_metadatas.push(ColumnMetadata {
1236 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1237 semantic_type: SemanticType::Field,
1238 column_id: 3,
1239 });
1240 let result =
1241 resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1242 .unwrap();
1243
1244 assert_eq!(result, vec![RegionId::new(1024, 0)]);
1245 }
1246
1247 #[test]
1248 fn test_resolve_column_metadatas_with_use_latest_strategy() {
1249 let column_metadatas = new_test_column_metadatas();
1250 let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1251 let mut new_column_metadatas = column_metadatas.clone();
1252 new_column_metadatas.push(ColumnMetadata {
1253 column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1254 semantic_type: SemanticType::Field,
1255 column_id: 3,
1256 });
1257
1258 let mut region_metadata2 =
1259 build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1260 region_metadata2.schema_version = 2;
1261
1262 let (resolved_column_metadatas, region_ids) =
1263 resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1264 assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1265 assert_eq!(resolved_column_metadatas, new_column_metadatas);
1266 }
1267}