common_meta/reconciliation/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::time::Instant;
19
20use api::v1::SemanticType;
21use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
22use common_telemetry::{error, warn};
23use datatypes::schema::ColumnSchema;
24use futures::future::{join_all, try_join_all};
25use snafu::{ensure, OptionExt, ResultExt};
26use store_api::metadata::{ColumnMetadata, RegionMetadata};
27use store_api::storage::{RegionId, TableId};
28use table::metadata::{RawTableInfo, RawTableMeta};
29use table::table_name::TableName;
30use table::table_reference::TableReference;
31
32use crate::cache_invalidator::CacheInvalidatorRef;
33use crate::error::{
34    ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
35    MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
36    ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
37    WaitProcedureSnafu,
38};
39use crate::key::TableMetadataManagerRef;
40use crate::metrics;
41use crate::node_manager::NodeManagerRef;
42use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
43use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
44use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
45
46#[derive(Debug, PartialEq, Eq)]
47pub(crate) struct PartialRegionMetadata<'a> {
48    pub(crate) column_metadatas: &'a [ColumnMetadata],
49    pub(crate) primary_key: &'a [u32],
50    pub(crate) table_id: TableId,
51}
52
53impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
54    fn from(region_metadata: &'a RegionMetadata) -> Self {
55        Self {
56            column_metadatas: &region_metadata.column_metadatas,
57            primary_key: &region_metadata.primary_key,
58            table_id: region_metadata.region_id.table_id(),
59        }
60    }
61}
62
63/// Checks if the column metadatas are consistent.
64///
65/// The column metadatas are consistent if:
66/// - The column metadatas are the same.
67/// - The primary key are the same.
68/// - The table id of the region metadatas are the same.
69///
70/// ## Panic
71/// Panic if region_metadatas is empty.
72pub(crate) fn check_column_metadatas_consistent(
73    region_metadatas: &[RegionMetadata],
74) -> Option<Vec<ColumnMetadata>> {
75    let is_column_metadata_consistent = region_metadatas
76        .windows(2)
77        .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
78
79    if !is_column_metadata_consistent {
80        return None;
81    }
82
83    Some(region_metadatas[0].column_metadatas.clone())
84}
85
86/// Resolves column metadata inconsistencies among the given region metadatas
87/// by using the column metadata from the metasrv as the source of truth.
88///
89/// All region metadatas whose column metadata differs from the given `column_metadatas`
90/// will be marked for reconciliation.
91///
92/// Returns the region ids that need to be reconciled.
93pub(crate) fn resolve_column_metadatas_with_metasrv(
94    column_metadatas: &[ColumnMetadata],
95    region_metadatas: &[RegionMetadata],
96) -> Result<Vec<RegionId>> {
97    let is_same_table = region_metadatas
98        .windows(2)
99        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
100
101    ensure!(
102        is_same_table,
103        UnexpectedSnafu {
104            err_msg: "Region metadatas are not from the same table"
105        }
106    );
107
108    let mut regions_ids = vec![];
109    for region_metadata in region_metadatas {
110        if region_metadata.column_metadatas != column_metadatas {
111            check_column_metadata_invariants(column_metadatas, &region_metadata.column_metadatas)?;
112            regions_ids.push(region_metadata.region_id);
113        }
114    }
115    Ok(regions_ids)
116}
117
118/// Resolves column metadata inconsistencies among the given region metadatas
119/// by selecting the column metadata with the highest schema version.
120///
121/// This strategy assumes that at most two versions of column metadata may exist,
122/// due to the poison mechanism, making the highest schema version a safe choice.
123///
124/// Returns the resolved column metadata and the region ids that need to be reconciled.
125pub(crate) fn resolve_column_metadatas_with_latest(
126    region_metadatas: &[RegionMetadata],
127) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
128    let is_same_table = region_metadatas
129        .windows(2)
130        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
131
132    ensure!(
133        is_same_table,
134        UnexpectedSnafu {
135            err_msg: "Region metadatas are not from the same table"
136        }
137    );
138
139    let latest_region_metadata = region_metadatas
140        .iter()
141        .max_by_key(|c| c.schema_version)
142        .context(UnexpectedSnafu {
143            err_msg: "All Region metadatas have the same schema version",
144        })?;
145    let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
146
147    let mut region_ids = vec![];
148    for region_metadata in region_metadatas {
149        if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
150            check_column_metadata_invariants(
151                &latest_region_metadata.column_metadatas,
152                &region_metadata.column_metadatas,
153            )?;
154            region_ids.push(region_metadata.region_id);
155        }
156    }
157
158    // TODO(weny): verify the new column metadatas are acceptable for regions.
159    Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
160}
161
162/// Constructs a vector of [`ColumnMetadata`] from the provided table information.
163///
164/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by
165/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID
166/// from the `name_to_ids` mapping.
167///
168/// Returns an error if any column name is missing in the mapping.
169pub(crate) fn build_column_metadata_from_table_info(
170    column_schemas: &[ColumnSchema],
171    primary_key_indexes: &[usize],
172    name_to_ids: &HashMap<String, u32>,
173) -> Result<Vec<ColumnMetadata>> {
174    let primary_names = primary_key_indexes
175        .iter()
176        .map(|i| column_schemas[*i].name.as_str())
177        .collect::<HashSet<_>>();
178
179    column_schemas
180        .iter()
181        .map(|column_schema| {
182            let column_id = *name_to_ids
183                .get(column_schema.name.as_str())
184                .with_context(|| UnexpectedSnafu {
185                    err_msg: format!(
186                        "Column name {} not found in name_to_ids",
187                        column_schema.name
188                    ),
189                })?;
190
191            let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
192                SemanticType::Tag
193            } else if column_schema.is_time_index() {
194                SemanticType::Timestamp
195            } else {
196                SemanticType::Field
197            };
198            Ok(ColumnMetadata {
199                column_schema: column_schema.clone(),
200                semantic_type,
201                column_id,
202            })
203        })
204        .collect::<Result<Vec<_>>>()
205}
206
207/// Checks whether the schema invariants hold between the existing and new column metadata.
208///
209/// Invariants:
210/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID.
211/// - Timestamp column must remain exactly the same in name and ID.
212pub(crate) fn check_column_metadata_invariants(
213    new_column_metadatas: &[ColumnMetadata],
214    column_metadatas: &[ColumnMetadata],
215) -> Result<()> {
216    let new_primary_keys = new_column_metadatas
217        .iter()
218        .filter(|c| c.semantic_type == SemanticType::Tag)
219        .map(|c| (c.column_schema.name.as_str(), c.column_id))
220        .collect::<HashMap<_, _>>();
221
222    let old_primary_keys = column_metadatas
223        .iter()
224        .filter(|c| c.semantic_type == SemanticType::Tag)
225        .map(|c| (c.column_schema.name.as_str(), c.column_id));
226
227    for (name, id) in old_primary_keys {
228        let column_id = new_primary_keys
229            .get(name)
230            .cloned()
231            .context(ColumnNotFoundSnafu {
232                column_name: name,
233                column_id: id,
234            })?;
235
236        ensure!(
237            column_id == id,
238            ColumnIdMismatchSnafu {
239                column_name: name,
240                expected_column_id: id,
241                actual_column_id: column_id,
242            }
243        );
244    }
245
246    let new_ts_column = new_column_metadatas
247        .iter()
248        .find(|c| c.semantic_type == SemanticType::Timestamp)
249        .map(|c| (c.column_schema.name.as_str(), c.column_id))
250        .context(UnexpectedSnafu {
251            err_msg: "Timestamp column not found in new column metadata",
252        })?;
253
254    let old_ts_column = column_metadatas
255        .iter()
256        .find(|c| c.semantic_type == SemanticType::Timestamp)
257        .map(|c| (c.column_schema.name.as_str(), c.column_id))
258        .context(UnexpectedSnafu {
259            err_msg: "Timestamp column not found in column metadata",
260        })?;
261    ensure!(
262        new_ts_column == old_ts_column,
263        TimestampMismatchSnafu {
264            expected_column_name: old_ts_column.0,
265            expected_column_id: old_ts_column.1,
266            actual_column_name: new_ts_column.0,
267            actual_column_id: new_ts_column.1,
268        }
269    );
270
271    Ok(())
272}
273
274/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
275///
276/// Returns an error if:
277/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided).
278/// - The column id in table metadata is not the same as the column id in the column metadata.(if `name_to_ids` is provided)
279/// - The table index is missing in the column metadata.
280/// - The primary key or partition key columns are missing in the column metadata.
281///
282/// TODO(weny): add tests
283pub(crate) fn build_table_meta_from_column_metadatas(
284    table_id: TableId,
285    table_ref: TableReference,
286    table_meta: &RawTableMeta,
287    name_to_ids: Option<HashMap<String, u32>>,
288    column_metadata: &[ColumnMetadata],
289) -> Result<RawTableMeta> {
290    let column_in_column_metadata = column_metadata
291        .iter()
292        .map(|c| (c.column_schema.name.as_str(), c))
293        .collect::<HashMap<_, _>>();
294    let primary_key_names = table_meta
295        .primary_key_indices
296        .iter()
297        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
298        .collect::<HashSet<_>>();
299    let partition_key_names = table_meta
300        .partition_key_indices
301        .iter()
302        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
303        .collect::<HashSet<_>>();
304    ensure!(
305        column_metadata
306            .iter()
307            .any(|c| c.semantic_type == SemanticType::Timestamp),
308        UnexpectedSnafu {
309            err_msg: format!(
310                "Missing table index in column metadata, table: {}, table_id: {}",
311                table_ref, table_id
312            ),
313        }
314    );
315
316    if let Some(name_to_ids) = &name_to_ids {
317        // Ensures all primary key and partition key exists in the column metadata.
318        for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
319            let column_in_column_metadata = column_in_column_metadata
320                .get(column_name)
321                .with_context(|| MissingColumnInColumnMetadataSnafu {
322                    column_name: column_name.to_string(),
323                    table_name: table_ref.to_string(),
324                    table_id,
325                })?;
326
327            let column_id = *name_to_ids
328                .get(*column_name)
329                .with_context(|| UnexpectedSnafu {
330                    err_msg: format!("column id not found in name_to_ids: {}", column_name),
331                })?;
332            ensure!(
333                column_id == column_in_column_metadata.column_id,
334                MismatchColumnIdSnafu {
335                    column_name: column_name.to_string(),
336                    column_id,
337                    table_name: table_ref.to_string(),
338                    table_id,
339                }
340            );
341        }
342    } else {
343        warn!(
344            "`name_to_ids` is not provided, table: {}, table_id: {}",
345            table_ref, table_id
346        );
347    }
348
349    let mut new_raw_table_meta = table_meta.clone();
350    let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
351    let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
352    let value_indices = &mut new_raw_table_meta.value_indices;
353    let time_index = &mut new_raw_table_meta.schema.timestamp_index;
354    let columns = &mut new_raw_table_meta.schema.column_schemas;
355    let column_ids = &mut new_raw_table_meta.column_ids;
356    let next_column_id = &mut new_raw_table_meta.next_column_id;
357
358    column_ids.clear();
359    value_indices.clear();
360    columns.clear();
361    primary_key_indices.clear();
362    partition_key_indices.clear();
363
364    for (idx, col) in column_metadata.iter().enumerate() {
365        if partition_key_names.contains(&col.column_schema.name.as_str()) {
366            partition_key_indices.push(idx);
367        }
368        match col.semantic_type {
369            SemanticType::Tag => {
370                primary_key_indices.push(idx);
371            }
372            SemanticType::Field => {
373                value_indices.push(idx);
374            }
375            SemanticType::Timestamp => {
376                value_indices.push(idx);
377                *time_index = Some(idx);
378            }
379        }
380
381        columns.push(col.column_schema.clone());
382        column_ids.push(col.column_id);
383    }
384
385    *next_column_id = column_ids
386        .iter()
387        .max()
388        .map(|max| max + 1)
389        .unwrap_or(*next_column_id)
390        .max(*next_column_id);
391
392    if let Some(time_index) = *time_index {
393        new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
394    }
395
396    Ok(new_raw_table_meta)
397}
398
399/// Returns true if the logical table info needs to be updated.
400///
401/// The logical table only support to add columns, so we can check the length of column metadatas
402/// to determine whether the logical table info needs to be updated.
403pub(crate) fn need_update_logical_table_info(
404    table_info: &RawTableInfo,
405    column_metadatas: &[ColumnMetadata],
406) -> bool {
407    table_info.meta.schema.column_schemas.len() != column_metadatas.len()
408}
409
410/// The result of waiting for inflight subprocedures.
411pub struct PartialSuccessResult<'a> {
412    pub failed_procedures: Vec<&'a SubprocedureMeta>,
413    pub success_procedures: Vec<&'a SubprocedureMeta>,
414}
415
416/// The result of waiting for inflight subprocedures.
417pub enum WaitForInflightSubproceduresResult<'a> {
418    Success(Vec<&'a SubprocedureMeta>),
419    PartialSuccess(PartialSuccessResult<'a>),
420}
421
422/// Wait for inflight subprocedures.
423///
424/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
425/// Otherwise, the function will continue waiting for all subprocedures to complete.
426pub(crate) async fn wait_for_inflight_subprocedures<'a>(
427    procedure_ctx: &ProcedureContext,
428    subprocedures: &'a [SubprocedureMeta],
429    fail_fast: bool,
430) -> Result<WaitForInflightSubproceduresResult<'a>> {
431    let mut receivers = Vec::with_capacity(subprocedures.len());
432    for subprocedure in subprocedures {
433        let procedure_id = subprocedure.procedure_id();
434        let receiver = procedure_ctx
435            .provider
436            .procedure_state_receiver(procedure_id)
437            .await
438            .context(ProcedureStateReceiverSnafu { procedure_id })?
439            .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
440        receivers.push((receiver, subprocedure));
441    }
442
443    let mut tasks = Vec::with_capacity(receivers.len());
444    for (receiver, subprocedure) in receivers.iter_mut() {
445        tasks.push(async move {
446            watcher::wait(receiver).await.inspect_err(|e| {
447                error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
448            })
449        });
450    }
451
452    if fail_fast {
453        try_join_all(tasks).await.context(WaitProcedureSnafu)?;
454        return Ok(WaitForInflightSubproceduresResult::Success(
455            subprocedures.iter().collect(),
456        ));
457    }
458
459    // If fail_fast is false, we need to wait for all subprocedures to complete.
460    let results = join_all(tasks).await;
461    let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
462    if failed_procedures_num == 0 {
463        return Ok(WaitForInflightSubproceduresResult::Success(
464            subprocedures.iter().collect(),
465        ));
466    }
467    warn!(
468        "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
469        failed_procedures_num,
470        subprocedures.len(),
471        procedure_ctx.procedure_id
472    );
473
474    let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
475    let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
476    for (result, subprocedure) in results.into_iter().zip(subprocedures) {
477        if result.is_err() {
478            failed_procedures.push(subprocedure);
479        } else {
480            success_procedures.push(subprocedure);
481        }
482    }
483
484    Ok(WaitForInflightSubproceduresResult::PartialSuccess(
485        PartialSuccessResult {
486            failed_procedures,
487            success_procedures,
488        },
489    ))
490}
491
492#[derive(Clone)]
493pub struct Context {
494    pub node_manager: NodeManagerRef,
495    pub table_metadata_manager: TableMetadataManagerRef,
496    pub cache_invalidator: CacheInvalidatorRef,
497}
498
499/// Metadata for an inflight physical table subprocedure.
500pub struct PhysicalTableMeta {
501    pub procedure_id: ProcedureId,
502    pub table_id: TableId,
503    pub table_name: TableName,
504}
505
506/// Metadata for an inflight logical table subprocedure.
507pub struct LogicalTableMeta {
508    pub procedure_id: ProcedureId,
509    pub physical_table_id: TableId,
510    pub physical_table_name: TableName,
511    pub logical_tables: Vec<(TableId, TableName)>,
512}
513
514/// Metadata for an inflight database subprocedure.
515pub struct ReconcileDatabaseMeta {
516    pub procedure_id: ProcedureId,
517    pub catalog: String,
518    pub schema: String,
519}
520
521/// The inflight subprocedure metadata.
522pub enum SubprocedureMeta {
523    PhysicalTable(PhysicalTableMeta),
524    LogicalTable(LogicalTableMeta),
525    Database(ReconcileDatabaseMeta),
526}
527
528impl Display for SubprocedureMeta {
529    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
530        match self {
531            SubprocedureMeta::PhysicalTable(meta) => {
532                write!(
533                    f,
534                    "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
535                    meta.procedure_id, meta.table_id, meta.table_name
536                )
537            }
538            SubprocedureMeta::LogicalTable(meta) => {
539                write!(
540                    f,
541                    "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
542                    meta.procedure_id, meta.physical_table_id, meta.physical_table_name, meta.logical_tables
543                )
544            }
545            SubprocedureMeta::Database(meta) => {
546                write!(
547                    f,
548                    "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
549                    meta.procedure_id, meta.catalog, meta.schema
550                )
551            }
552        }
553    }
554}
555
556impl SubprocedureMeta {
557    /// Creates a new logical table subprocedure metadata.
558    pub fn new_logical_table(
559        procedure_id: ProcedureId,
560        physical_table_id: TableId,
561        physical_table_name: TableName,
562        logical_tables: Vec<(TableId, TableName)>,
563    ) -> Self {
564        Self::LogicalTable(LogicalTableMeta {
565            procedure_id,
566            physical_table_id,
567            physical_table_name,
568            logical_tables,
569        })
570    }
571
572    /// Creates a new physical table subprocedure metadata.
573    pub fn new_physical_table(
574        procedure_id: ProcedureId,
575        table_id: TableId,
576        table_name: TableName,
577    ) -> Self {
578        Self::PhysicalTable(PhysicalTableMeta {
579            procedure_id,
580            table_id,
581            table_name,
582        })
583    }
584
585    /// Creates a new reconcile database subprocedure metadata.
586    pub fn new_reconcile_database(
587        procedure_id: ProcedureId,
588        catalog: String,
589        schema: String,
590    ) -> Self {
591        Self::Database(ReconcileDatabaseMeta {
592            procedure_id,
593            catalog,
594            schema,
595        })
596    }
597
598    /// Returns the procedure id of the subprocedure.
599    pub fn procedure_id(&self) -> ProcedureId {
600        match self {
601            SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
602            SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
603            SubprocedureMeta::Database(meta) => meta.procedure_id,
604        }
605    }
606
607    /// Returns the number of tables will be reconciled.
608    pub fn table_num(&self) -> usize {
609        match self {
610            SubprocedureMeta::PhysicalTable(_) => 1,
611            SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
612            SubprocedureMeta::Database(_) => 0,
613        }
614    }
615
616    /// Returns the number of databases will be reconciled.
617    pub fn database_num(&self) -> usize {
618        match self {
619            SubprocedureMeta::Database(_) => 1,
620            _ => 0,
621        }
622    }
623}
624
625/// The metrics of reconciling catalog.
626#[derive(Clone, Default)]
627pub struct ReconcileCatalogMetrics {
628    pub succeeded_databases: usize,
629    pub failed_databases: usize,
630}
631
632impl AddAssign for ReconcileCatalogMetrics {
633    fn add_assign(&mut self, other: Self) {
634        self.succeeded_databases += other.succeeded_databases;
635        self.failed_databases += other.failed_databases;
636    }
637}
638
639impl Display for ReconcileCatalogMetrics {
640    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641        write!(
642            f,
643            "succeeded_databases: {}, failed_databases: {}",
644            self.succeeded_databases, self.failed_databases
645        )
646    }
647}
648
649impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
650    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
651        match result {
652            WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
653                succeeded_databases: subprocedures.len(),
654                failed_databases: 0,
655            },
656            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
657                failed_procedures,
658                success_procedures,
659            }) => {
660                let succeeded_databases = success_procedures
661                    .iter()
662                    .map(|subprocedure| subprocedure.database_num())
663                    .sum();
664                let failed_databases = failed_procedures
665                    .iter()
666                    .map(|subprocedure| subprocedure.database_num())
667                    .sum();
668                ReconcileCatalogMetrics {
669                    succeeded_databases,
670                    failed_databases,
671                }
672            }
673        }
674    }
675}
676
677/// The metrics of reconciling database.
678#[derive(Clone, Default)]
679pub struct ReconcileDatabaseMetrics {
680    pub succeeded_tables: usize,
681    pub failed_tables: usize,
682    pub succeeded_procedures: usize,
683    pub failed_procedures: usize,
684}
685
686impl Display for ReconcileDatabaseMetrics {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        write!(f, "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}", self.succeeded_tables, self.failed_tables, self.succeeded_procedures, self.failed_procedures)
689    }
690}
691
692impl AddAssign for ReconcileDatabaseMetrics {
693    fn add_assign(&mut self, other: Self) {
694        self.succeeded_tables += other.succeeded_tables;
695        self.failed_tables += other.failed_tables;
696        self.succeeded_procedures += other.succeeded_procedures;
697        self.failed_procedures += other.failed_procedures;
698    }
699}
700
701impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
702    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
703        match result {
704            WaitForInflightSubproceduresResult::Success(subprocedures) => {
705                let table_num = subprocedures
706                    .iter()
707                    .map(|subprocedure| subprocedure.table_num())
708                    .sum();
709                ReconcileDatabaseMetrics {
710                    succeeded_procedures: subprocedures.len(),
711                    failed_procedures: 0,
712                    succeeded_tables: table_num,
713                    failed_tables: 0,
714                }
715            }
716            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
717                failed_procedures,
718                success_procedures,
719            }) => {
720                let succeeded_tables = success_procedures
721                    .iter()
722                    .map(|subprocedure| subprocedure.table_num())
723                    .sum();
724                let failed_tables = failed_procedures
725                    .iter()
726                    .map(|subprocedure| subprocedure.table_num())
727                    .sum();
728                ReconcileDatabaseMetrics {
729                    succeeded_procedures: success_procedures.len(),
730                    failed_procedures: failed_procedures.len(),
731                    succeeded_tables,
732                    failed_tables,
733                }
734            }
735        }
736    }
737}
738
739/// The metrics of reconciling logical tables.
740#[derive(Clone)]
741pub struct ReconcileLogicalTableMetrics {
742    pub start_time: Instant,
743    pub update_table_info_count: usize,
744    pub create_tables_count: usize,
745    pub column_metadata_consistent_count: usize,
746    pub column_metadata_inconsistent_count: usize,
747}
748
749impl Default for ReconcileLogicalTableMetrics {
750    fn default() -> Self {
751        Self {
752            start_time: Instant::now(),
753            update_table_info_count: 0,
754            create_tables_count: 0,
755            column_metadata_consistent_count: 0,
756            column_metadata_inconsistent_count: 0,
757        }
758    }
759}
760
761const CREATE_TABLES: &str = "create_tables";
762const UPDATE_TABLE_INFO: &str = "update_table_info";
763const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
764const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
765
766impl ReconcileLogicalTableMetrics {
767    /// The total number of tables that have been reconciled.
768    pub fn total_table_count(&self) -> usize {
769        self.create_tables_count
770            + self.column_metadata_consistent_count
771            + self.column_metadata_inconsistent_count
772    }
773}
774
775impl Drop for ReconcileLogicalTableMetrics {
776    fn drop(&mut self) {
777        let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
778        metrics::METRIC_META_RECONCILIATION_STATS
779            .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
780            .inc_by(self.create_tables_count as u64);
781        metrics::METRIC_META_RECONCILIATION_STATS
782            .with_label_values(&[
783                procedure_name,
784                metrics::TABLE_TYPE_LOGICAL,
785                UPDATE_TABLE_INFO,
786            ])
787            .inc_by(self.update_table_info_count as u64);
788        metrics::METRIC_META_RECONCILIATION_STATS
789            .with_label_values(&[
790                procedure_name,
791                metrics::TABLE_TYPE_LOGICAL,
792                COLUMN_METADATA_CONSISTENT,
793            ])
794            .inc_by(self.column_metadata_consistent_count as u64);
795        metrics::METRIC_META_RECONCILIATION_STATS
796            .with_label_values(&[
797                procedure_name,
798                metrics::TABLE_TYPE_LOGICAL,
799                COLUMN_METADATA_INCONSISTENT,
800            ])
801            .inc_by(self.column_metadata_inconsistent_count as u64);
802    }
803}
804
805impl Display for ReconcileLogicalTableMetrics {
806    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
807        let elapsed = self.start_time.elapsed();
808        if self.create_tables_count > 0 {
809            write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
810        }
811        if self.update_table_info_count > 0 {
812            write!(
813                f,
814                "update_table_info_count: {}, ",
815                self.update_table_info_count
816            )?;
817        }
818        if self.column_metadata_consistent_count > 0 {
819            write!(
820                f,
821                "column_metadata_consistent_count: {}, ",
822                self.column_metadata_consistent_count
823            )?;
824        }
825        if self.column_metadata_inconsistent_count > 0 {
826            write!(
827                f,
828                "column_metadata_inconsistent_count: {}, ",
829                self.column_metadata_inconsistent_count
830            )?;
831        }
832
833        write!(
834            f,
835            "total_table_count: {}, elapsed: {:?}",
836            self.total_table_count(),
837            elapsed
838        )
839    }
840}
841
842/// The result of resolving column metadata.
843#[derive(Clone, Copy)]
844pub enum ResolveColumnMetadataResult {
845    Consistent,
846    Inconsistent(ResolveStrategy),
847}
848
849impl Display for ResolveColumnMetadataResult {
850    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
851        match self {
852            ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
853            ResolveColumnMetadataResult::Inconsistent(strategy) => {
854                let strategy_str = strategy.as_ref();
855                write!(f, "Inconsistent({})", strategy_str)
856            }
857        }
858    }
859}
860
861/// The metrics of reconciling physical tables.
862#[derive(Clone)]
863pub struct ReconcileTableMetrics {
864    /// The start time of the reconciliation.
865    pub start_time: Instant,
866    /// The result of resolving column metadata.
867    pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
868    /// Whether the table info has been updated.
869    pub update_table_info: bool,
870}
871
872impl Drop for ReconcileTableMetrics {
873    fn drop(&mut self) {
874        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
875            match resolve_column_metadata_result {
876                ResolveColumnMetadataResult::Consistent => {
877                    metrics::METRIC_META_RECONCILIATION_STATS
878                        .with_label_values(&[
879                            ReconcileTableProcedure::TYPE_NAME,
880                            metrics::TABLE_TYPE_PHYSICAL,
881                            COLUMN_METADATA_CONSISTENT,
882                        ])
883                        .inc();
884                }
885                ResolveColumnMetadataResult::Inconsistent(strategy) => {
886                    metrics::METRIC_META_RECONCILIATION_STATS
887                        .with_label_values(&[
888                            ReconcileTableProcedure::TYPE_NAME,
889                            metrics::TABLE_TYPE_PHYSICAL,
890                            COLUMN_METADATA_INCONSISTENT,
891                        ])
892                        .inc();
893                    metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
894                        .with_label_values(&[strategy.as_ref()])
895                        .inc();
896                }
897            }
898        }
899        if self.update_table_info {
900            metrics::METRIC_META_RECONCILIATION_STATS
901                .with_label_values(&[
902                    ReconcileTableProcedure::TYPE_NAME,
903                    metrics::TABLE_TYPE_PHYSICAL,
904                    UPDATE_TABLE_INFO,
905                ])
906                .inc();
907        }
908    }
909}
910
911impl Default for ReconcileTableMetrics {
912    fn default() -> Self {
913        Self {
914            start_time: Instant::now(),
915            resolve_column_metadata_result: None,
916            update_table_info: false,
917        }
918    }
919}
920
921impl Display for ReconcileTableMetrics {
922    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
923        let elapsed = self.start_time.elapsed();
924        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
925            write!(
926                f,
927                "resolve_column_metadata_result: {}, ",
928                resolve_column_metadata_result
929            )?;
930        }
931        write!(
932            f,
933            "update_table_info: {}, elapsed: {:?}",
934            self.update_table_info, elapsed
935        )
936    }
937}
938
939#[cfg(test)]
940mod tests {
941    use std::assert_matches::assert_matches;
942    use std::collections::HashMap;
943    use std::sync::Arc;
944
945    use api::v1::SemanticType;
946    use datatypes::prelude::ConcreteDataType;
947    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
948    use store_api::metadata::ColumnMetadata;
949    use store_api::storage::RegionId;
950    use table::metadata::{RawTableMeta, TableMetaBuilder};
951    use table::table_reference::TableReference;
952
953    use super::*;
954    use crate::ddl::test_util::region_metadata::build_region_metadata;
955    use crate::error::Error;
956    use crate::reconciliation::utils::check_column_metadatas_consistent;
957
958    fn new_test_schema() -> Schema {
959        let column_schemas = vec![
960            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
961            ColumnSchema::new(
962                "ts",
963                ConcreteDataType::timestamp_millisecond_datatype(),
964                false,
965            )
966            .with_time_index(true),
967            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
968        ];
969        SchemaBuilder::try_from(column_schemas)
970            .unwrap()
971            .version(123)
972            .build()
973            .unwrap()
974    }
975
976    fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
977        vec![
978            ColumnMetadata {
979                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
980                semantic_type: SemanticType::Tag,
981                column_id: 0,
982            },
983            ColumnMetadata {
984                column_schema: ColumnSchema::new(
985                    "ts",
986                    ConcreteDataType::timestamp_millisecond_datatype(),
987                    false,
988                )
989                .with_time_index(true),
990                semantic_type: SemanticType::Timestamp,
991                column_id: 1,
992            },
993            ColumnMetadata {
994                column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
995                semantic_type: SemanticType::Field,
996                column_id: 2,
997            },
998        ]
999    }
1000
1001    fn new_test_raw_table_info() -> RawTableMeta {
1002        let mut table_meta_builder = TableMetaBuilder::empty();
1003        let table_meta = table_meta_builder
1004            .schema(Arc::new(new_test_schema()))
1005            .primary_key_indices(vec![0])
1006            .partition_key_indices(vec![2])
1007            .next_column_id(4)
1008            .build()
1009            .unwrap();
1010
1011        table_meta.into()
1012    }
1013
1014    #[test]
1015    fn test_build_table_info_from_column_metadatas_identical() {
1016        let column_metadatas = new_test_column_metadatas();
1017        let table_id = 1;
1018        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1019        let mut table_meta = new_test_raw_table_info();
1020        table_meta.column_ids = vec![0, 1, 2];
1021        let name_to_ids = HashMap::from([
1022            ("col1".to_string(), 0),
1023            ("ts".to_string(), 1),
1024            ("col2".to_string(), 2),
1025        ]);
1026
1027        let new_table_meta = build_table_meta_from_column_metadatas(
1028            table_id,
1029            table_ref,
1030            &table_meta,
1031            Some(name_to_ids),
1032            &column_metadatas,
1033        )
1034        .unwrap();
1035        assert_eq!(new_table_meta, table_meta);
1036    }
1037
1038    #[test]
1039    fn test_build_table_info_from_column_metadatas() {
1040        let mut column_metadatas = new_test_column_metadatas();
1041        column_metadatas.push(ColumnMetadata {
1042            column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
1043            semantic_type: SemanticType::Tag,
1044            column_id: 3,
1045        });
1046
1047        let table_id = 1;
1048        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1049        let table_meta = new_test_raw_table_info();
1050        let name_to_ids = HashMap::from([
1051            ("col1".to_string(), 0),
1052            ("ts".to_string(), 1),
1053            ("col2".to_string(), 2),
1054        ]);
1055
1056        let new_table_meta = build_table_meta_from_column_metadatas(
1057            table_id,
1058            table_ref,
1059            &table_meta,
1060            Some(name_to_ids),
1061            &column_metadatas,
1062        )
1063        .unwrap();
1064
1065        assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1066        assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1067        assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1068        assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
1069        assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
1070        assert_eq!(new_table_meta.next_column_id, 4);
1071    }
1072
1073    #[test]
1074    fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1075        let column_metadatas = new_test_column_metadatas();
1076        let table_id = 1;
1077        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1078        let table_meta = new_test_raw_table_info();
1079        let name_to_ids = HashMap::from([
1080            ("col1".to_string(), 0),
1081            ("ts".to_string(), 1),
1082            // Change column id of col2 to 3.
1083            ("col2".to_string(), 3),
1084        ]);
1085
1086        let err = build_table_meta_from_column_metadatas(
1087            table_id,
1088            table_ref,
1089            &table_meta,
1090            Some(name_to_ids),
1091            &column_metadatas,
1092        )
1093        .unwrap_err();
1094
1095        assert_matches!(err, Error::MismatchColumnId { .. });
1096    }
1097
1098    #[test]
1099    fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1100        let mut column_metadatas = new_test_column_metadatas();
1101        column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1102        let table_id = 1;
1103        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1104        let table_meta = new_test_raw_table_info();
1105        let name_to_ids = HashMap::from([
1106            ("col1".to_string(), 0),
1107            ("ts".to_string(), 1),
1108            ("col2".to_string(), 2),
1109        ]);
1110
1111        let err = build_table_meta_from_column_metadatas(
1112            table_id,
1113            table_ref,
1114            &table_meta,
1115            Some(name_to_ids),
1116            &column_metadatas,
1117        )
1118        .unwrap_err();
1119
1120        assert!(
1121            err.to_string()
1122                .contains("Missing table index in column metadata"),
1123            "err: {}",
1124            err
1125        );
1126    }
1127
1128    #[test]
1129    fn test_build_table_info_from_column_metadatas_with_missing_column() {
1130        let mut column_metadatas = new_test_column_metadatas();
1131        // Remove primary key column.
1132        column_metadatas.retain(|c| c.column_id != 0);
1133        let table_id = 1;
1134        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1135        let table_meta = new_test_raw_table_info();
1136        let name_to_ids = HashMap::from([
1137            ("col1".to_string(), 0),
1138            ("ts".to_string(), 1),
1139            ("col2".to_string(), 2),
1140        ]);
1141
1142        let err = build_table_meta_from_column_metadatas(
1143            table_id,
1144            table_ref,
1145            &table_meta,
1146            Some(name_to_ids.clone()),
1147            &column_metadatas,
1148        )
1149        .unwrap_err();
1150        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1151
1152        let mut column_metadatas = new_test_column_metadatas();
1153        // Remove partition key column.
1154        column_metadatas.retain(|c| c.column_id != 2);
1155
1156        let err = build_table_meta_from_column_metadatas(
1157            table_id,
1158            table_ref,
1159            &table_meta,
1160            Some(name_to_ids),
1161            &column_metadatas,
1162        )
1163        .unwrap_err();
1164        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1165    }
1166
1167    #[test]
1168    fn test_check_column_metadatas_consistent() {
1169        let column_metadatas = new_test_column_metadatas();
1170        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1171        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1172        let result =
1173            check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1174        assert_eq!(result, column_metadatas);
1175
1176        let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1177        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1178        let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1179        assert!(result.is_none());
1180    }
1181
1182    #[test]
1183    fn test_check_column_metadata_invariants() {
1184        let column_metadatas = new_test_column_metadatas();
1185        let mut new_column_metadatas = column_metadatas.clone();
1186        new_column_metadatas.push(ColumnMetadata {
1187            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1188            semantic_type: SemanticType::Field,
1189            column_id: 3,
1190        });
1191        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1192    }
1193
1194    #[test]
1195    fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1196        let column_metadatas = new_test_column_metadatas();
1197        let mut new_column_metadatas = column_metadatas.clone();
1198        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1199        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1200
1201        let column_metadatas = new_test_column_metadatas();
1202        let mut new_column_metadatas = column_metadatas.clone();
1203        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1204        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1205    }
1206
1207    #[test]
1208    fn test_check_column_metadata_invariants_mismatch_column_id() {
1209        let column_metadatas = new_test_column_metadatas();
1210        let mut new_column_metadatas = column_metadatas.clone();
1211        if let Some(col) = new_column_metadatas
1212            .iter_mut()
1213            .find(|c| c.semantic_type == SemanticType::Timestamp)
1214        {
1215            col.column_id = 100;
1216        }
1217        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1218
1219        let column_metadatas = new_test_column_metadatas();
1220        let mut new_column_metadatas = column_metadatas.clone();
1221        if let Some(col) = new_column_metadatas
1222            .iter_mut()
1223            .find(|c| c.semantic_type == SemanticType::Tag)
1224        {
1225            col.column_id = 100;
1226        }
1227        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1228    }
1229
1230    #[test]
1231    fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1232        let column_metadatas = new_test_column_metadatas();
1233        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1234        let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1235        metasrv_column_metadatas.push(ColumnMetadata {
1236            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1237            semantic_type: SemanticType::Field,
1238            column_id: 3,
1239        });
1240        let result =
1241            resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1242                .unwrap();
1243
1244        assert_eq!(result, vec![RegionId::new(1024, 0)]);
1245    }
1246
1247    #[test]
1248    fn test_resolve_column_metadatas_with_use_latest_strategy() {
1249        let column_metadatas = new_test_column_metadatas();
1250        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1251        let mut new_column_metadatas = column_metadatas.clone();
1252        new_column_metadatas.push(ColumnMetadata {
1253            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1254            semantic_type: SemanticType::Field,
1255            column_id: 3,
1256        });
1257
1258        let mut region_metadata2 =
1259            build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1260        region_metadata2.schema_version = 2;
1261
1262        let (resolved_column_metadatas, region_ids) =
1263            resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1264        assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1265        assert_eq!(resolved_column_metadatas, new_column_metadatas);
1266    }
1267}