common_meta/reconciliation/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::v1::SemanticType;
22use common_procedure::{Context as ProcedureContext, ProcedureId, watcher};
23use common_telemetry::{error, warn};
24use datatypes::schema::{ColumnSchema, Schema};
25use futures::future::{join_all, try_join_all};
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::metadata::{ColumnMetadata, RegionMetadata};
28use store_api::storage::consts::ReservedColumnId;
29use store_api::storage::{RegionId, TableId};
30use table::metadata::{TableInfo, TableMeta};
31use table::table_name::TableName;
32use table::table_reference::TableReference;
33
34use crate::cache_invalidator::CacheInvalidatorRef;
35use crate::error::{
36    ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
37    MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
38    ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
39    WaitProcedureSnafu,
40};
41use crate::key::TableMetadataManagerRef;
42use crate::metrics;
43use crate::node_manager::NodeManagerRef;
44use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
45use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
46use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
47
48#[derive(Debug, PartialEq, Eq)]
49pub(crate) struct PartialRegionMetadata<'a> {
50    pub(crate) column_metadatas: &'a [ColumnMetadata],
51    pub(crate) primary_key: &'a [u32],
52    pub(crate) table_id: TableId,
53}
54
55impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
56    fn from(region_metadata: &'a RegionMetadata) -> Self {
57        Self {
58            column_metadatas: &region_metadata.column_metadatas,
59            primary_key: &region_metadata.primary_key,
60            table_id: region_metadata.region_id.table_id(),
61        }
62    }
63}
64
65/// Checks if the column metadatas are consistent.
66///
67/// The column metadatas are consistent if:
68/// - The column metadatas are the same.
69/// - The primary key are the same.
70/// - The table id of the region metadatas are the same.
71///
72/// ## Panic
73/// Panic if region_metadatas is empty.
74pub(crate) fn check_column_metadatas_consistent(
75    region_metadatas: &[RegionMetadata],
76) -> Option<Vec<ColumnMetadata>> {
77    let is_column_metadata_consistent = region_metadatas
78        .windows(2)
79        .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
80
81    if !is_column_metadata_consistent {
82        return None;
83    }
84
85    Some(region_metadatas[0].column_metadatas.clone())
86}
87
88/// Resolves column metadata inconsistencies among the given region metadatas
89/// by using the column metadata from the metasrv as the source of truth.
90///
91/// All region metadatas whose column metadata differs from the given `column_metadatas`
92/// will be marked for reconciliation.
93///
94/// Returns the region ids that need to be reconciled.
95pub(crate) fn resolve_column_metadatas_with_metasrv(
96    column_metadatas: &[ColumnMetadata],
97    region_metadatas: &[RegionMetadata],
98) -> Result<Vec<RegionId>> {
99    let is_same_table = region_metadatas
100        .windows(2)
101        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
102
103    ensure!(
104        is_same_table,
105        UnexpectedSnafu {
106            err_msg: "Region metadatas are not from the same table"
107        }
108    );
109
110    let mut regions_ids = vec![];
111    for region_metadata in region_metadatas {
112        if region_metadata.column_metadatas != column_metadatas {
113            check_column_metadata_invariants(column_metadatas, &region_metadata.column_metadatas)?;
114            regions_ids.push(region_metadata.region_id);
115        }
116    }
117    Ok(regions_ids)
118}
119
120/// Resolves column metadata inconsistencies among the given region metadatas
121/// by selecting the column metadata with the highest schema version.
122///
123/// This strategy assumes that at most two versions of column metadata may exist,
124/// due to the poison mechanism, making the highest schema version a safe choice.
125///
126/// Returns the resolved column metadata and the region ids that need to be reconciled.
127pub(crate) fn resolve_column_metadatas_with_latest(
128    region_metadatas: &[RegionMetadata],
129) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
130    let is_same_table = region_metadatas
131        .windows(2)
132        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
133
134    ensure!(
135        is_same_table,
136        UnexpectedSnafu {
137            err_msg: "Region metadatas are not from the same table"
138        }
139    );
140
141    let latest_region_metadata = region_metadatas
142        .iter()
143        .max_by_key(|c| c.schema_version)
144        .context(UnexpectedSnafu {
145            err_msg: "All Region metadatas have the same schema version",
146        })?;
147    let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
148
149    let mut region_ids = vec![];
150    for region_metadata in region_metadatas {
151        if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
152            check_column_metadata_invariants(
153                &latest_region_metadata.column_metadatas,
154                &region_metadata.column_metadatas,
155            )?;
156            region_ids.push(region_metadata.region_id);
157        }
158    }
159
160    // TODO(weny): verify the new column metadatas are acceptable for regions.
161    Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
162}
163
164/// Constructs a vector of [`ColumnMetadata`] from the provided table information.
165///
166/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by
167/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID
168/// from the `name_to_ids` mapping.
169///
170/// Returns an error if any column name is missing in the mapping.
171pub(crate) fn build_column_metadata_from_table_info(
172    column_schemas: &[ColumnSchema],
173    primary_key_indexes: &[usize],
174    name_to_ids: &HashMap<String, u32>,
175) -> Result<Vec<ColumnMetadata>> {
176    let primary_names = primary_key_indexes
177        .iter()
178        .map(|i| column_schemas[*i].name.as_str())
179        .collect::<HashSet<_>>();
180
181    column_schemas
182        .iter()
183        .map(|column_schema| {
184            let column_id = *name_to_ids
185                .get(column_schema.name.as_str())
186                .with_context(|| UnexpectedSnafu {
187                    err_msg: format!(
188                        "Column name {} not found in name_to_ids",
189                        column_schema.name
190                    ),
191                })?;
192
193            let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
194                SemanticType::Tag
195            } else if column_schema.is_time_index() {
196                SemanticType::Timestamp
197            } else {
198                SemanticType::Field
199            };
200            Ok(ColumnMetadata {
201                column_schema: column_schema.clone(),
202                semantic_type,
203                column_id,
204            })
205        })
206        .collect::<Result<Vec<_>>>()
207}
208
209/// Checks whether the schema invariants hold between the existing and new column metadata.
210///
211/// Invariants:
212/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID.
213/// - Timestamp column must remain exactly the same in name and ID.
214pub(crate) fn check_column_metadata_invariants(
215    new_column_metadatas: &[ColumnMetadata],
216    column_metadatas: &[ColumnMetadata],
217) -> Result<()> {
218    let new_primary_keys = new_column_metadatas
219        .iter()
220        .filter(|c| c.semantic_type == SemanticType::Tag)
221        .map(|c| (c.column_schema.name.as_str(), c.column_id))
222        .collect::<HashMap<_, _>>();
223
224    let old_primary_keys = column_metadatas
225        .iter()
226        .filter(|c| c.semantic_type == SemanticType::Tag)
227        .map(|c| (c.column_schema.name.as_str(), c.column_id));
228
229    for (name, id) in old_primary_keys {
230        let column_id = new_primary_keys
231            .get(name)
232            .cloned()
233            .context(ColumnNotFoundSnafu {
234                column_name: name,
235                column_id: id,
236            })?;
237
238        ensure!(
239            column_id == id,
240            ColumnIdMismatchSnafu {
241                column_name: name,
242                expected_column_id: id,
243                actual_column_id: column_id,
244            }
245        );
246    }
247
248    let new_ts_column = new_column_metadatas
249        .iter()
250        .find(|c| c.semantic_type == SemanticType::Timestamp)
251        .map(|c| (c.column_schema.name.as_str(), c.column_id))
252        .context(UnexpectedSnafu {
253            err_msg: "Timestamp column not found in new column metadata",
254        })?;
255
256    let old_ts_column = column_metadatas
257        .iter()
258        .find(|c| c.semantic_type == SemanticType::Timestamp)
259        .map(|c| (c.column_schema.name.as_str(), c.column_id))
260        .context(UnexpectedSnafu {
261            err_msg: "Timestamp column not found in column metadata",
262        })?;
263    ensure!(
264        new_ts_column == old_ts_column,
265        TimestampMismatchSnafu {
266            expected_column_name: old_ts_column.0,
267            expected_column_id: old_ts_column.1,
268            actual_column_name: new_ts_column.0,
269            actual_column_id: new_ts_column.1,
270        }
271    );
272
273    Ok(())
274}
275
276/// Builds a [`TableMeta`] from the provided [`ColumnMetadata`]s.
277///
278/// Returns an error if:
279/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided).
280/// - The column id in table metadata is not the same as the column id in the column metadata.(if `name_to_ids` is provided)
281/// - The table index is missing in the column metadata.
282/// - The primary key or partition key columns are missing in the column metadata.
283///
284/// TODO(weny): add tests
285pub(crate) fn build_table_meta_from_column_metadatas(
286    table_id: TableId,
287    table_ref: TableReference,
288    table_meta: &TableMeta,
289    name_to_ids: Option<HashMap<String, u32>>,
290    column_metadata: &[ColumnMetadata],
291) -> Result<TableMeta> {
292    let column_in_column_metadata = column_metadata
293        .iter()
294        .map(|c| (c.column_schema.name.as_str(), c))
295        .collect::<HashMap<_, _>>();
296    let column_schemas = table_meta.schema.column_schemas();
297    let primary_key_names = table_meta
298        .primary_key_indices
299        .iter()
300        .map(|i| column_schemas[*i].name.as_str())
301        .collect::<HashSet<_>>();
302    let partition_key_names = table_meta
303        .partition_key_indices
304        .iter()
305        .map(|i| column_schemas[*i].name.as_str())
306        .collect::<HashSet<_>>();
307    ensure!(
308        column_metadata
309            .iter()
310            .any(|c| c.semantic_type == SemanticType::Timestamp),
311        UnexpectedSnafu {
312            err_msg: format!(
313                "Missing table index in column metadata, table: {}, table_id: {}",
314                table_ref, table_id
315            ),
316        }
317    );
318
319    if let Some(name_to_ids) = &name_to_ids {
320        // Ensures all primary key and partition key exists in the column metadata.
321        for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
322            let column_in_column_metadata = column_in_column_metadata
323                .get(column_name)
324                .with_context(|| MissingColumnInColumnMetadataSnafu {
325                    column_name: column_name.to_string(),
326                    table_name: table_ref.to_string(),
327                    table_id,
328                })?;
329
330            let column_id = *name_to_ids
331                .get(*column_name)
332                .with_context(|| UnexpectedSnafu {
333                    err_msg: format!("column id not found in name_to_ids: {}", column_name),
334                })?;
335            ensure!(
336                column_id == column_in_column_metadata.column_id,
337                MismatchColumnIdSnafu {
338                    column_name: column_name.to_string(),
339                    column_id,
340                    table_name: table_ref.to_string(),
341                    table_id,
342                }
343            );
344        }
345    } else {
346        warn!(
347            "`name_to_ids` is not provided, table: {}, table_id: {}",
348            table_ref, table_id
349        );
350    }
351
352    let mut new_raw_table_meta = table_meta.clone();
353    let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
354    let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
355    let value_indices = &mut new_raw_table_meta.value_indices;
356    let mut columns = Vec::with_capacity(column_metadata.len());
357    let column_ids = &mut new_raw_table_meta.column_ids;
358    let next_column_id = &mut new_raw_table_meta.next_column_id;
359
360    column_ids.clear();
361    value_indices.clear();
362    primary_key_indices.clear();
363    partition_key_indices.clear();
364
365    for (idx, col) in column_metadata.iter().enumerate() {
366        if partition_key_names.contains(&col.column_schema.name.as_str()) {
367            partition_key_indices.push(idx);
368        }
369        match col.semantic_type {
370            SemanticType::Tag => {
371                primary_key_indices.push(idx);
372            }
373            SemanticType::Field => {
374                value_indices.push(idx);
375            }
376            SemanticType::Timestamp => {
377                value_indices.push(idx);
378            }
379        }
380
381        columns.push(col.column_schema.clone());
382        column_ids.push(col.column_id);
383    }
384
385    *next_column_id = column_ids
386        .iter()
387        .filter(|id| !ReservedColumnId::is_reserved(**id))
388        .max()
389        .map(|max| max + 1)
390        .unwrap_or(*next_column_id)
391        .max(*next_column_id);
392
393    new_raw_table_meta.schema = Arc::new(Schema::new_with_version(
394        columns,
395        table_meta.schema.version(),
396    ));
397    Ok(new_raw_table_meta)
398}
399
400/// Returns true if the logical table info needs to be updated.
401///
402/// The logical table only support to add columns, so we can check the length of column metadatas
403/// to determine whether the logical table info needs to be updated.
404pub(crate) fn need_update_logical_table_info(
405    table_info: &TableInfo,
406    column_metadatas: &[ColumnMetadata],
407) -> bool {
408    table_info.meta.schema.column_schemas().len() != column_metadatas.len()
409}
410
411/// The result of waiting for inflight subprocedures.
412pub struct PartialSuccessResult<'a> {
413    pub failed_procedures: Vec<&'a SubprocedureMeta>,
414    pub success_procedures: Vec<&'a SubprocedureMeta>,
415}
416
417/// The result of waiting for inflight subprocedures.
418pub enum WaitForInflightSubproceduresResult<'a> {
419    Success(Vec<&'a SubprocedureMeta>),
420    PartialSuccess(PartialSuccessResult<'a>),
421}
422
423/// Wait for inflight subprocedures.
424///
425/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
426/// Otherwise, the function will continue waiting for all subprocedures to complete.
427pub(crate) async fn wait_for_inflight_subprocedures<'a>(
428    procedure_ctx: &ProcedureContext,
429    subprocedures: &'a [SubprocedureMeta],
430    fail_fast: bool,
431) -> Result<WaitForInflightSubproceduresResult<'a>> {
432    let mut receivers = Vec::with_capacity(subprocedures.len());
433    for subprocedure in subprocedures {
434        let procedure_id = subprocedure.procedure_id();
435        let receiver = procedure_ctx
436            .provider
437            .procedure_state_receiver(procedure_id)
438            .await
439            .context(ProcedureStateReceiverSnafu { procedure_id })?
440            .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
441        receivers.push((receiver, subprocedure));
442    }
443
444    let mut tasks = Vec::with_capacity(receivers.len());
445    for (receiver, subprocedure) in receivers.iter_mut() {
446        tasks.push(async move {
447            watcher::wait(receiver).await.inspect_err(|e| {
448                error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
449            })
450        });
451    }
452
453    if fail_fast {
454        try_join_all(tasks).await.context(WaitProcedureSnafu)?;
455        return Ok(WaitForInflightSubproceduresResult::Success(
456            subprocedures.iter().collect(),
457        ));
458    }
459
460    // If fail_fast is false, we need to wait for all subprocedures to complete.
461    let results = join_all(tasks).await;
462    let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
463    if failed_procedures_num == 0 {
464        return Ok(WaitForInflightSubproceduresResult::Success(
465            subprocedures.iter().collect(),
466        ));
467    }
468    warn!(
469        "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
470        failed_procedures_num,
471        subprocedures.len(),
472        procedure_ctx.procedure_id
473    );
474
475    let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
476    let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
477    for (result, subprocedure) in results.into_iter().zip(subprocedures) {
478        if result.is_err() {
479            failed_procedures.push(subprocedure);
480        } else {
481            success_procedures.push(subprocedure);
482        }
483    }
484
485    Ok(WaitForInflightSubproceduresResult::PartialSuccess(
486        PartialSuccessResult {
487            failed_procedures,
488            success_procedures,
489        },
490    ))
491}
492
493#[derive(Clone)]
494pub struct Context {
495    pub node_manager: NodeManagerRef,
496    pub table_metadata_manager: TableMetadataManagerRef,
497    pub cache_invalidator: CacheInvalidatorRef,
498}
499
500/// Metadata for an inflight physical table subprocedure.
501pub struct PhysicalTableMeta {
502    pub procedure_id: ProcedureId,
503    pub table_id: TableId,
504    pub table_name: TableName,
505}
506
507/// Metadata for an inflight logical table subprocedure.
508pub struct LogicalTableMeta {
509    pub procedure_id: ProcedureId,
510    pub physical_table_id: TableId,
511    pub physical_table_name: TableName,
512    pub logical_tables: Vec<(TableId, TableName)>,
513}
514
515/// Metadata for an inflight database subprocedure.
516pub struct ReconcileDatabaseMeta {
517    pub procedure_id: ProcedureId,
518    pub catalog: String,
519    pub schema: String,
520}
521
522/// The inflight subprocedure metadata.
523pub enum SubprocedureMeta {
524    PhysicalTable(PhysicalTableMeta),
525    LogicalTable(LogicalTableMeta),
526    Database(ReconcileDatabaseMeta),
527}
528
529impl Display for SubprocedureMeta {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        match self {
532            SubprocedureMeta::PhysicalTable(meta) => {
533                write!(
534                    f,
535                    "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
536                    meta.procedure_id, meta.table_id, meta.table_name
537                )
538            }
539            SubprocedureMeta::LogicalTable(meta) => {
540                write!(
541                    f,
542                    "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
543                    meta.procedure_id,
544                    meta.physical_table_id,
545                    meta.physical_table_name,
546                    meta.logical_tables
547                )
548            }
549            SubprocedureMeta::Database(meta) => {
550                write!(
551                    f,
552                    "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
553                    meta.procedure_id, meta.catalog, meta.schema
554                )
555            }
556        }
557    }
558}
559
560impl SubprocedureMeta {
561    /// Creates a new logical table subprocedure metadata.
562    pub fn new_logical_table(
563        procedure_id: ProcedureId,
564        physical_table_id: TableId,
565        physical_table_name: TableName,
566        logical_tables: Vec<(TableId, TableName)>,
567    ) -> Self {
568        Self::LogicalTable(LogicalTableMeta {
569            procedure_id,
570            physical_table_id,
571            physical_table_name,
572            logical_tables,
573        })
574    }
575
576    /// Creates a new physical table subprocedure metadata.
577    pub fn new_physical_table(
578        procedure_id: ProcedureId,
579        table_id: TableId,
580        table_name: TableName,
581    ) -> Self {
582        Self::PhysicalTable(PhysicalTableMeta {
583            procedure_id,
584            table_id,
585            table_name,
586        })
587    }
588
589    /// Creates a new reconcile database subprocedure metadata.
590    pub fn new_reconcile_database(
591        procedure_id: ProcedureId,
592        catalog: String,
593        schema: String,
594    ) -> Self {
595        Self::Database(ReconcileDatabaseMeta {
596            procedure_id,
597            catalog,
598            schema,
599        })
600    }
601
602    /// Returns the procedure id of the subprocedure.
603    pub fn procedure_id(&self) -> ProcedureId {
604        match self {
605            SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
606            SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
607            SubprocedureMeta::Database(meta) => meta.procedure_id,
608        }
609    }
610
611    /// Returns the number of tables will be reconciled.
612    pub fn table_num(&self) -> usize {
613        match self {
614            SubprocedureMeta::PhysicalTable(_) => 1,
615            SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
616            SubprocedureMeta::Database(_) => 0,
617        }
618    }
619
620    /// Returns the number of databases will be reconciled.
621    pub fn database_num(&self) -> usize {
622        match self {
623            SubprocedureMeta::Database(_) => 1,
624            _ => 0,
625        }
626    }
627}
628
629/// The metrics of reconciling catalog.
630#[derive(Clone, Default)]
631pub struct ReconcileCatalogMetrics {
632    pub succeeded_databases: usize,
633    pub failed_databases: usize,
634}
635
636impl AddAssign for ReconcileCatalogMetrics {
637    fn add_assign(&mut self, other: Self) {
638        self.succeeded_databases += other.succeeded_databases;
639        self.failed_databases += other.failed_databases;
640    }
641}
642
643impl Display for ReconcileCatalogMetrics {
644    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645        write!(
646            f,
647            "succeeded_databases: {}, failed_databases: {}",
648            self.succeeded_databases, self.failed_databases
649        )
650    }
651}
652
653impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
654    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
655        match result {
656            WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
657                succeeded_databases: subprocedures.len(),
658                failed_databases: 0,
659            },
660            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
661                failed_procedures,
662                success_procedures,
663            }) => {
664                let succeeded_databases = success_procedures
665                    .iter()
666                    .map(|subprocedure| subprocedure.database_num())
667                    .sum();
668                let failed_databases = failed_procedures
669                    .iter()
670                    .map(|subprocedure| subprocedure.database_num())
671                    .sum();
672                ReconcileCatalogMetrics {
673                    succeeded_databases,
674                    failed_databases,
675                }
676            }
677        }
678    }
679}
680
681/// The metrics of reconciling database.
682#[derive(Clone, Default)]
683pub struct ReconcileDatabaseMetrics {
684    pub succeeded_tables: usize,
685    pub failed_tables: usize,
686    pub succeeded_procedures: usize,
687    pub failed_procedures: usize,
688}
689
690impl Display for ReconcileDatabaseMetrics {
691    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692        write!(
693            f,
694            "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}",
695            self.succeeded_tables,
696            self.failed_tables,
697            self.succeeded_procedures,
698            self.failed_procedures
699        )
700    }
701}
702
703impl AddAssign for ReconcileDatabaseMetrics {
704    fn add_assign(&mut self, other: Self) {
705        self.succeeded_tables += other.succeeded_tables;
706        self.failed_tables += other.failed_tables;
707        self.succeeded_procedures += other.succeeded_procedures;
708        self.failed_procedures += other.failed_procedures;
709    }
710}
711
712impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
713    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
714        match result {
715            WaitForInflightSubproceduresResult::Success(subprocedures) => {
716                let table_num = subprocedures
717                    .iter()
718                    .map(|subprocedure| subprocedure.table_num())
719                    .sum();
720                ReconcileDatabaseMetrics {
721                    succeeded_procedures: subprocedures.len(),
722                    failed_procedures: 0,
723                    succeeded_tables: table_num,
724                    failed_tables: 0,
725                }
726            }
727            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
728                failed_procedures,
729                success_procedures,
730            }) => {
731                let succeeded_tables = success_procedures
732                    .iter()
733                    .map(|subprocedure| subprocedure.table_num())
734                    .sum();
735                let failed_tables = failed_procedures
736                    .iter()
737                    .map(|subprocedure| subprocedure.table_num())
738                    .sum();
739                ReconcileDatabaseMetrics {
740                    succeeded_procedures: success_procedures.len(),
741                    failed_procedures: failed_procedures.len(),
742                    succeeded_tables,
743                    failed_tables,
744                }
745            }
746        }
747    }
748}
749
750/// The metrics of reconciling logical tables.
751#[derive(Clone)]
752pub struct ReconcileLogicalTableMetrics {
753    pub start_time: Instant,
754    pub update_table_info_count: usize,
755    pub create_tables_count: usize,
756    pub column_metadata_consistent_count: usize,
757    pub column_metadata_inconsistent_count: usize,
758}
759
760impl Default for ReconcileLogicalTableMetrics {
761    fn default() -> Self {
762        Self {
763            start_time: Instant::now(),
764            update_table_info_count: 0,
765            create_tables_count: 0,
766            column_metadata_consistent_count: 0,
767            column_metadata_inconsistent_count: 0,
768        }
769    }
770}
771
772const CREATE_TABLES: &str = "create_tables";
773const UPDATE_TABLE_INFO: &str = "update_table_info";
774const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
775const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
776
777impl ReconcileLogicalTableMetrics {
778    /// The total number of tables that have been reconciled.
779    pub fn total_table_count(&self) -> usize {
780        self.create_tables_count
781            + self.column_metadata_consistent_count
782            + self.column_metadata_inconsistent_count
783    }
784}
785
786impl Drop for ReconcileLogicalTableMetrics {
787    fn drop(&mut self) {
788        let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
789        metrics::METRIC_META_RECONCILIATION_STATS
790            .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
791            .inc_by(self.create_tables_count as u64);
792        metrics::METRIC_META_RECONCILIATION_STATS
793            .with_label_values(&[
794                procedure_name,
795                metrics::TABLE_TYPE_LOGICAL,
796                UPDATE_TABLE_INFO,
797            ])
798            .inc_by(self.update_table_info_count as u64);
799        metrics::METRIC_META_RECONCILIATION_STATS
800            .with_label_values(&[
801                procedure_name,
802                metrics::TABLE_TYPE_LOGICAL,
803                COLUMN_METADATA_CONSISTENT,
804            ])
805            .inc_by(self.column_metadata_consistent_count as u64);
806        metrics::METRIC_META_RECONCILIATION_STATS
807            .with_label_values(&[
808                procedure_name,
809                metrics::TABLE_TYPE_LOGICAL,
810                COLUMN_METADATA_INCONSISTENT,
811            ])
812            .inc_by(self.column_metadata_inconsistent_count as u64);
813    }
814}
815
816impl Display for ReconcileLogicalTableMetrics {
817    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
818        let elapsed = self.start_time.elapsed();
819        if self.create_tables_count > 0 {
820            write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
821        }
822        if self.update_table_info_count > 0 {
823            write!(
824                f,
825                "update_table_info_count: {}, ",
826                self.update_table_info_count
827            )?;
828        }
829        if self.column_metadata_consistent_count > 0 {
830            write!(
831                f,
832                "column_metadata_consistent_count: {}, ",
833                self.column_metadata_consistent_count
834            )?;
835        }
836        if self.column_metadata_inconsistent_count > 0 {
837            write!(
838                f,
839                "column_metadata_inconsistent_count: {}, ",
840                self.column_metadata_inconsistent_count
841            )?;
842        }
843
844        write!(
845            f,
846            "total_table_count: {}, elapsed: {:?}",
847            self.total_table_count(),
848            elapsed
849        )
850    }
851}
852
853/// The result of resolving column metadata.
854#[derive(Clone, Copy)]
855pub enum ResolveColumnMetadataResult {
856    Consistent,
857    Inconsistent(ResolveStrategy),
858}
859
860impl Display for ResolveColumnMetadataResult {
861    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
862        match self {
863            ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
864            ResolveColumnMetadataResult::Inconsistent(strategy) => {
865                let strategy_str = strategy.as_ref();
866                write!(f, "Inconsistent({})", strategy_str)
867            }
868        }
869    }
870}
871
872/// The metrics of reconciling physical tables.
873#[derive(Clone)]
874pub struct ReconcileTableMetrics {
875    /// The start time of the reconciliation.
876    pub start_time: Instant,
877    /// The result of resolving column metadata.
878    pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
879    /// Whether the table info has been updated.
880    pub update_table_info: bool,
881}
882
883impl Drop for ReconcileTableMetrics {
884    fn drop(&mut self) {
885        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
886            match resolve_column_metadata_result {
887                ResolveColumnMetadataResult::Consistent => {
888                    metrics::METRIC_META_RECONCILIATION_STATS
889                        .with_label_values(&[
890                            ReconcileTableProcedure::TYPE_NAME,
891                            metrics::TABLE_TYPE_PHYSICAL,
892                            COLUMN_METADATA_CONSISTENT,
893                        ])
894                        .inc();
895                }
896                ResolveColumnMetadataResult::Inconsistent(strategy) => {
897                    metrics::METRIC_META_RECONCILIATION_STATS
898                        .with_label_values(&[
899                            ReconcileTableProcedure::TYPE_NAME,
900                            metrics::TABLE_TYPE_PHYSICAL,
901                            COLUMN_METADATA_INCONSISTENT,
902                        ])
903                        .inc();
904                    metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
905                        .with_label_values(&[strategy.as_ref()])
906                        .inc();
907                }
908            }
909        }
910        if self.update_table_info {
911            metrics::METRIC_META_RECONCILIATION_STATS
912                .with_label_values(&[
913                    ReconcileTableProcedure::TYPE_NAME,
914                    metrics::TABLE_TYPE_PHYSICAL,
915                    UPDATE_TABLE_INFO,
916                ])
917                .inc();
918        }
919    }
920}
921
922impl Default for ReconcileTableMetrics {
923    fn default() -> Self {
924        Self {
925            start_time: Instant::now(),
926            resolve_column_metadata_result: None,
927            update_table_info: false,
928        }
929    }
930}
931
932impl Display for ReconcileTableMetrics {
933    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
934        let elapsed = self.start_time.elapsed();
935        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
936            write!(
937                f,
938                "resolve_column_metadata_result: {}, ",
939                resolve_column_metadata_result
940            )?;
941        }
942        write!(
943            f,
944            "update_table_info: {}, elapsed: {:?}",
945            self.update_table_info, elapsed
946        )
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use std::assert_matches::assert_matches;
953    use std::collections::HashMap;
954    use std::sync::Arc;
955
956    use api::v1::SemanticType;
957    use datatypes::prelude::ConcreteDataType;
958    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
959    use store_api::metadata::ColumnMetadata;
960    use store_api::storage::RegionId;
961    use table::metadata::TableMetaBuilder;
962    use table::table_reference::TableReference;
963
964    use super::*;
965    use crate::ddl::test_util::region_metadata::build_region_metadata;
966    use crate::error::Error;
967    use crate::reconciliation::utils::check_column_metadatas_consistent;
968
969    fn new_test_schema() -> Schema {
970        let column_schemas = vec![
971            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
972            ColumnSchema::new(
973                "ts",
974                ConcreteDataType::timestamp_millisecond_datatype(),
975                false,
976            )
977            .with_time_index(true),
978            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
979        ];
980        SchemaBuilder::try_from(column_schemas)
981            .unwrap()
982            .version(123)
983            .build()
984            .unwrap()
985    }
986
987    fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
988        vec![
989            ColumnMetadata {
990                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
991                semantic_type: SemanticType::Tag,
992                column_id: 0,
993            },
994            ColumnMetadata {
995                column_schema: ColumnSchema::new(
996                    "ts",
997                    ConcreteDataType::timestamp_millisecond_datatype(),
998                    false,
999                )
1000                .with_time_index(true),
1001                semantic_type: SemanticType::Timestamp,
1002                column_id: 1,
1003            },
1004            ColumnMetadata {
1005                column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1006                semantic_type: SemanticType::Field,
1007                column_id: 2,
1008            },
1009        ]
1010    }
1011
1012    fn new_test_raw_table_info() -> TableMeta {
1013        let mut table_meta_builder = TableMetaBuilder::empty();
1014        table_meta_builder
1015            .schema(Arc::new(new_test_schema()))
1016            .primary_key_indices(vec![0])
1017            .partition_key_indices(vec![2])
1018            .next_column_id(4)
1019            .build()
1020            .unwrap()
1021    }
1022
1023    #[test]
1024    fn test_build_table_info_from_column_metadatas_identical() {
1025        let column_metadatas = new_test_column_metadatas();
1026        let table_id = 1;
1027        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1028        let mut table_meta = new_test_raw_table_info();
1029        table_meta.column_ids = vec![0, 1, 2];
1030        let name_to_ids = HashMap::from([
1031            ("col1".to_string(), 0),
1032            ("ts".to_string(), 1),
1033            ("col2".to_string(), 2),
1034        ]);
1035
1036        let new_table_meta = build_table_meta_from_column_metadatas(
1037            table_id,
1038            table_ref,
1039            &table_meta,
1040            Some(name_to_ids),
1041            &column_metadatas,
1042        )
1043        .unwrap();
1044        assert_eq!(new_table_meta, table_meta);
1045    }
1046
1047    #[test]
1048    fn test_build_table_info_from_column_metadatas() {
1049        let mut column_metadatas = new_test_column_metadatas();
1050        column_metadatas.push(ColumnMetadata {
1051            column_schema: ColumnSchema::new(
1052                "__table_id",
1053                ConcreteDataType::string_datatype(),
1054                true,
1055            ),
1056            semantic_type: SemanticType::Tag,
1057            column_id: ReservedColumnId::table_id(),
1058        });
1059
1060        let table_id = 1;
1061        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1062        let table_meta = new_test_raw_table_info();
1063        let name_to_ids = HashMap::from([
1064            ("col1".to_string(), 0),
1065            ("ts".to_string(), 1),
1066            ("col2".to_string(), 2),
1067        ]);
1068
1069        let new_table_meta = build_table_meta_from_column_metadatas(
1070            table_id,
1071            table_ref,
1072            &table_meta,
1073            Some(name_to_ids),
1074            &column_metadatas,
1075        )
1076        .unwrap();
1077
1078        assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1079        assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1080        assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1081        assert_eq!(new_table_meta.schema.timestamp_index(), Some(1));
1082        assert_eq!(
1083            new_table_meta.column_ids,
1084            vec![0, 1, 2, ReservedColumnId::table_id()]
1085        );
1086        assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
1087    }
1088
1089    #[test]
1090    fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1091        let column_metadatas = new_test_column_metadatas();
1092        let table_id = 1;
1093        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1094        let table_meta = new_test_raw_table_info();
1095        let name_to_ids = HashMap::from([
1096            ("col1".to_string(), 0),
1097            ("ts".to_string(), 1),
1098            // Change column id of col2 to 3.
1099            ("col2".to_string(), 3),
1100        ]);
1101
1102        let err = build_table_meta_from_column_metadatas(
1103            table_id,
1104            table_ref,
1105            &table_meta,
1106            Some(name_to_ids),
1107            &column_metadatas,
1108        )
1109        .unwrap_err();
1110
1111        assert_matches!(err, Error::MismatchColumnId { .. });
1112    }
1113
1114    #[test]
1115    fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1116        let mut column_metadatas = new_test_column_metadatas();
1117        column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1118        let table_id = 1;
1119        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1120        let table_meta = new_test_raw_table_info();
1121        let name_to_ids = HashMap::from([
1122            ("col1".to_string(), 0),
1123            ("ts".to_string(), 1),
1124            ("col2".to_string(), 2),
1125        ]);
1126
1127        let err = build_table_meta_from_column_metadatas(
1128            table_id,
1129            table_ref,
1130            &table_meta,
1131            Some(name_to_ids),
1132            &column_metadatas,
1133        )
1134        .unwrap_err();
1135
1136        assert!(
1137            err.to_string()
1138                .contains("Missing table index in column metadata"),
1139            "err: {}",
1140            err
1141        );
1142    }
1143
1144    #[test]
1145    fn test_build_table_info_from_column_metadatas_with_missing_column() {
1146        let mut column_metadatas = new_test_column_metadatas();
1147        // Remove primary key column.
1148        column_metadatas.retain(|c| c.column_id != 0);
1149        let table_id = 1;
1150        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1151        let table_meta = new_test_raw_table_info();
1152        let name_to_ids = HashMap::from([
1153            ("col1".to_string(), 0),
1154            ("ts".to_string(), 1),
1155            ("col2".to_string(), 2),
1156        ]);
1157
1158        let err = build_table_meta_from_column_metadatas(
1159            table_id,
1160            table_ref,
1161            &table_meta,
1162            Some(name_to_ids.clone()),
1163            &column_metadatas,
1164        )
1165        .unwrap_err();
1166        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1167
1168        let mut column_metadatas = new_test_column_metadatas();
1169        // Remove partition key column.
1170        column_metadatas.retain(|c| c.column_id != 2);
1171
1172        let err = build_table_meta_from_column_metadatas(
1173            table_id,
1174            table_ref,
1175            &table_meta,
1176            Some(name_to_ids),
1177            &column_metadatas,
1178        )
1179        .unwrap_err();
1180        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1181    }
1182
1183    #[test]
1184    fn test_check_column_metadatas_consistent() {
1185        let column_metadatas = new_test_column_metadatas();
1186        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1187        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1188        let result =
1189            check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1190        assert_eq!(result, column_metadatas);
1191
1192        let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1193        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1194        let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1195        assert!(result.is_none());
1196    }
1197
1198    #[test]
1199    fn test_check_column_metadata_invariants() {
1200        let column_metadatas = new_test_column_metadatas();
1201        let mut new_column_metadatas = column_metadatas.clone();
1202        new_column_metadatas.push(ColumnMetadata {
1203            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1204            semantic_type: SemanticType::Field,
1205            column_id: 3,
1206        });
1207        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1208    }
1209
1210    #[test]
1211    fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1212        let column_metadatas = new_test_column_metadatas();
1213        let mut new_column_metadatas = column_metadatas.clone();
1214        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1215        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1216
1217        let column_metadatas = new_test_column_metadatas();
1218        let mut new_column_metadatas = column_metadatas.clone();
1219        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1220        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1221    }
1222
1223    #[test]
1224    fn test_check_column_metadata_invariants_mismatch_column_id() {
1225        let column_metadatas = new_test_column_metadatas();
1226        let mut new_column_metadatas = column_metadatas.clone();
1227        if let Some(col) = new_column_metadatas
1228            .iter_mut()
1229            .find(|c| c.semantic_type == SemanticType::Timestamp)
1230        {
1231            col.column_id = 100;
1232        }
1233        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1234
1235        let column_metadatas = new_test_column_metadatas();
1236        let mut new_column_metadatas = column_metadatas.clone();
1237        if let Some(col) = new_column_metadatas
1238            .iter_mut()
1239            .find(|c| c.semantic_type == SemanticType::Tag)
1240        {
1241            col.column_id = 100;
1242        }
1243        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1244    }
1245
1246    #[test]
1247    fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1248        let column_metadatas = new_test_column_metadatas();
1249        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1250        let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1251        metasrv_column_metadatas.push(ColumnMetadata {
1252            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1253            semantic_type: SemanticType::Field,
1254            column_id: 3,
1255        });
1256        let result =
1257            resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1258                .unwrap();
1259
1260        assert_eq!(result, vec![RegionId::new(1024, 0)]);
1261    }
1262
1263    #[test]
1264    fn test_resolve_column_metadatas_with_use_latest_strategy() {
1265        let column_metadatas = new_test_column_metadatas();
1266        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1267        let mut new_column_metadatas = column_metadatas.clone();
1268        new_column_metadatas.push(ColumnMetadata {
1269            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1270            semantic_type: SemanticType::Field,
1271            column_id: 3,
1272        });
1273
1274        let mut region_metadata2 =
1275            build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1276        region_metadata2.schema_version = 2;
1277
1278        let (resolved_column_metadatas, region_ids) =
1279            resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1280        assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1281        assert_eq!(resolved_column_metadatas, new_column_metadatas);
1282    }
1283}