common_meta/reconciliation/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display};
17use std::ops::AddAssign;
18use std::time::Instant;
19
20use api::v1::SemanticType;
21use common_procedure::{Context as ProcedureContext, ProcedureId, watcher};
22use common_telemetry::{error, warn};
23use datatypes::schema::ColumnSchema;
24use futures::future::{join_all, try_join_all};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::metadata::{ColumnMetadata, RegionMetadata};
27use store_api::storage::consts::ReservedColumnId;
28use store_api::storage::{RegionId, TableId};
29use table::metadata::{RawTableInfo, RawTableMeta};
30use table::table_name::TableName;
31use table::table_reference::TableReference;
32
33use crate::cache_invalidator::CacheInvalidatorRef;
34use crate::error::{
35    ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
36    MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
37    ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
38    WaitProcedureSnafu,
39};
40use crate::key::TableMetadataManagerRef;
41use crate::metrics;
42use crate::node_manager::NodeManagerRef;
43use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
44use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
45use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
46
47#[derive(Debug, PartialEq, Eq)]
48pub(crate) struct PartialRegionMetadata<'a> {
49    pub(crate) column_metadatas: &'a [ColumnMetadata],
50    pub(crate) primary_key: &'a [u32],
51    pub(crate) table_id: TableId,
52}
53
54impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
55    fn from(region_metadata: &'a RegionMetadata) -> Self {
56        Self {
57            column_metadatas: &region_metadata.column_metadatas,
58            primary_key: &region_metadata.primary_key,
59            table_id: region_metadata.region_id.table_id(),
60        }
61    }
62}
63
64/// Checks if the column metadatas are consistent.
65///
66/// The column metadatas are consistent if:
67/// - The column metadatas are the same.
68/// - The primary key are the same.
69/// - The table id of the region metadatas are the same.
70///
71/// ## Panic
72/// Panic if region_metadatas is empty.
73pub(crate) fn check_column_metadatas_consistent(
74    region_metadatas: &[RegionMetadata],
75) -> Option<Vec<ColumnMetadata>> {
76    let is_column_metadata_consistent = region_metadatas
77        .windows(2)
78        .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
79
80    if !is_column_metadata_consistent {
81        return None;
82    }
83
84    Some(region_metadatas[0].column_metadatas.clone())
85}
86
87/// Resolves column metadata inconsistencies among the given region metadatas
88/// by using the column metadata from the metasrv as the source of truth.
89///
90/// All region metadatas whose column metadata differs from the given `column_metadatas`
91/// will be marked for reconciliation.
92///
93/// Returns the region ids that need to be reconciled.
94pub(crate) fn resolve_column_metadatas_with_metasrv(
95    column_metadatas: &[ColumnMetadata],
96    region_metadatas: &[RegionMetadata],
97) -> Result<Vec<RegionId>> {
98    let is_same_table = region_metadatas
99        .windows(2)
100        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
101
102    ensure!(
103        is_same_table,
104        UnexpectedSnafu {
105            err_msg: "Region metadatas are not from the same table"
106        }
107    );
108
109    let mut regions_ids = vec![];
110    for region_metadata in region_metadatas {
111        if region_metadata.column_metadatas != column_metadatas {
112            check_column_metadata_invariants(column_metadatas, &region_metadata.column_metadatas)?;
113            regions_ids.push(region_metadata.region_id);
114        }
115    }
116    Ok(regions_ids)
117}
118
119/// Resolves column metadata inconsistencies among the given region metadatas
120/// by selecting the column metadata with the highest schema version.
121///
122/// This strategy assumes that at most two versions of column metadata may exist,
123/// due to the poison mechanism, making the highest schema version a safe choice.
124///
125/// Returns the resolved column metadata and the region ids that need to be reconciled.
126pub(crate) fn resolve_column_metadatas_with_latest(
127    region_metadatas: &[RegionMetadata],
128) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
129    let is_same_table = region_metadatas
130        .windows(2)
131        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
132
133    ensure!(
134        is_same_table,
135        UnexpectedSnafu {
136            err_msg: "Region metadatas are not from the same table"
137        }
138    );
139
140    let latest_region_metadata = region_metadatas
141        .iter()
142        .max_by_key(|c| c.schema_version)
143        .context(UnexpectedSnafu {
144            err_msg: "All Region metadatas have the same schema version",
145        })?;
146    let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
147
148    let mut region_ids = vec![];
149    for region_metadata in region_metadatas {
150        if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
151            check_column_metadata_invariants(
152                &latest_region_metadata.column_metadatas,
153                &region_metadata.column_metadatas,
154            )?;
155            region_ids.push(region_metadata.region_id);
156        }
157    }
158
159    // TODO(weny): verify the new column metadatas are acceptable for regions.
160    Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
161}
162
163/// Constructs a vector of [`ColumnMetadata`] from the provided table information.
164///
165/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by
166/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID
167/// from the `name_to_ids` mapping.
168///
169/// Returns an error if any column name is missing in the mapping.
170pub(crate) fn build_column_metadata_from_table_info(
171    column_schemas: &[ColumnSchema],
172    primary_key_indexes: &[usize],
173    name_to_ids: &HashMap<String, u32>,
174) -> Result<Vec<ColumnMetadata>> {
175    let primary_names = primary_key_indexes
176        .iter()
177        .map(|i| column_schemas[*i].name.as_str())
178        .collect::<HashSet<_>>();
179
180    column_schemas
181        .iter()
182        .map(|column_schema| {
183            let column_id = *name_to_ids
184                .get(column_schema.name.as_str())
185                .with_context(|| UnexpectedSnafu {
186                    err_msg: format!(
187                        "Column name {} not found in name_to_ids",
188                        column_schema.name
189                    ),
190                })?;
191
192            let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
193                SemanticType::Tag
194            } else if column_schema.is_time_index() {
195                SemanticType::Timestamp
196            } else {
197                SemanticType::Field
198            };
199            Ok(ColumnMetadata {
200                column_schema: column_schema.clone(),
201                semantic_type,
202                column_id,
203            })
204        })
205        .collect::<Result<Vec<_>>>()
206}
207
208/// Checks whether the schema invariants hold between the existing and new column metadata.
209///
210/// Invariants:
211/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID.
212/// - Timestamp column must remain exactly the same in name and ID.
213pub(crate) fn check_column_metadata_invariants(
214    new_column_metadatas: &[ColumnMetadata],
215    column_metadatas: &[ColumnMetadata],
216) -> Result<()> {
217    let new_primary_keys = new_column_metadatas
218        .iter()
219        .filter(|c| c.semantic_type == SemanticType::Tag)
220        .map(|c| (c.column_schema.name.as_str(), c.column_id))
221        .collect::<HashMap<_, _>>();
222
223    let old_primary_keys = column_metadatas
224        .iter()
225        .filter(|c| c.semantic_type == SemanticType::Tag)
226        .map(|c| (c.column_schema.name.as_str(), c.column_id));
227
228    for (name, id) in old_primary_keys {
229        let column_id = new_primary_keys
230            .get(name)
231            .cloned()
232            .context(ColumnNotFoundSnafu {
233                column_name: name,
234                column_id: id,
235            })?;
236
237        ensure!(
238            column_id == id,
239            ColumnIdMismatchSnafu {
240                column_name: name,
241                expected_column_id: id,
242                actual_column_id: column_id,
243            }
244        );
245    }
246
247    let new_ts_column = new_column_metadatas
248        .iter()
249        .find(|c| c.semantic_type == SemanticType::Timestamp)
250        .map(|c| (c.column_schema.name.as_str(), c.column_id))
251        .context(UnexpectedSnafu {
252            err_msg: "Timestamp column not found in new column metadata",
253        })?;
254
255    let old_ts_column = column_metadatas
256        .iter()
257        .find(|c| c.semantic_type == SemanticType::Timestamp)
258        .map(|c| (c.column_schema.name.as_str(), c.column_id))
259        .context(UnexpectedSnafu {
260            err_msg: "Timestamp column not found in column metadata",
261        })?;
262    ensure!(
263        new_ts_column == old_ts_column,
264        TimestampMismatchSnafu {
265            expected_column_name: old_ts_column.0,
266            expected_column_id: old_ts_column.1,
267            actual_column_name: new_ts_column.0,
268            actual_column_id: new_ts_column.1,
269        }
270    );
271
272    Ok(())
273}
274
275/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
276///
277/// Returns an error if:
278/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided).
279/// - The column id in table metadata is not the same as the column id in the column metadata.(if `name_to_ids` is provided)
280/// - The table index is missing in the column metadata.
281/// - The primary key or partition key columns are missing in the column metadata.
282///
283/// TODO(weny): add tests
284pub(crate) fn build_table_meta_from_column_metadatas(
285    table_id: TableId,
286    table_ref: TableReference,
287    table_meta: &RawTableMeta,
288    name_to_ids: Option<HashMap<String, u32>>,
289    column_metadata: &[ColumnMetadata],
290) -> Result<RawTableMeta> {
291    let column_in_column_metadata = column_metadata
292        .iter()
293        .map(|c| (c.column_schema.name.as_str(), c))
294        .collect::<HashMap<_, _>>();
295    let primary_key_names = table_meta
296        .primary_key_indices
297        .iter()
298        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
299        .collect::<HashSet<_>>();
300    let partition_key_names = table_meta
301        .partition_key_indices
302        .iter()
303        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
304        .collect::<HashSet<_>>();
305    ensure!(
306        column_metadata
307            .iter()
308            .any(|c| c.semantic_type == SemanticType::Timestamp),
309        UnexpectedSnafu {
310            err_msg: format!(
311                "Missing table index in column metadata, table: {}, table_id: {}",
312                table_ref, table_id
313            ),
314        }
315    );
316
317    if let Some(name_to_ids) = &name_to_ids {
318        // Ensures all primary key and partition key exists in the column metadata.
319        for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
320            let column_in_column_metadata = column_in_column_metadata
321                .get(column_name)
322                .with_context(|| MissingColumnInColumnMetadataSnafu {
323                    column_name: column_name.to_string(),
324                    table_name: table_ref.to_string(),
325                    table_id,
326                })?;
327
328            let column_id = *name_to_ids
329                .get(*column_name)
330                .with_context(|| UnexpectedSnafu {
331                    err_msg: format!("column id not found in name_to_ids: {}", column_name),
332                })?;
333            ensure!(
334                column_id == column_in_column_metadata.column_id,
335                MismatchColumnIdSnafu {
336                    column_name: column_name.to_string(),
337                    column_id,
338                    table_name: table_ref.to_string(),
339                    table_id,
340                }
341            );
342        }
343    } else {
344        warn!(
345            "`name_to_ids` is not provided, table: {}, table_id: {}",
346            table_ref, table_id
347        );
348    }
349
350    let mut new_raw_table_meta = table_meta.clone();
351    let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
352    let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
353    let value_indices = &mut new_raw_table_meta.value_indices;
354    let time_index = &mut new_raw_table_meta.schema.timestamp_index;
355    let columns = &mut new_raw_table_meta.schema.column_schemas;
356    let column_ids = &mut new_raw_table_meta.column_ids;
357    let next_column_id = &mut new_raw_table_meta.next_column_id;
358
359    column_ids.clear();
360    value_indices.clear();
361    columns.clear();
362    primary_key_indices.clear();
363    partition_key_indices.clear();
364
365    for (idx, col) in column_metadata.iter().enumerate() {
366        if partition_key_names.contains(&col.column_schema.name.as_str()) {
367            partition_key_indices.push(idx);
368        }
369        match col.semantic_type {
370            SemanticType::Tag => {
371                primary_key_indices.push(idx);
372            }
373            SemanticType::Field => {
374                value_indices.push(idx);
375            }
376            SemanticType::Timestamp => {
377                value_indices.push(idx);
378                *time_index = Some(idx);
379            }
380        }
381
382        columns.push(col.column_schema.clone());
383        column_ids.push(col.column_id);
384    }
385
386    *next_column_id = column_ids
387        .iter()
388        .filter(|id| !ReservedColumnId::is_reserved(**id))
389        .max()
390        .map(|max| max + 1)
391        .unwrap_or(*next_column_id)
392        .max(*next_column_id);
393
394    if let Some(time_index) = *time_index {
395        new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
396    }
397
398    Ok(new_raw_table_meta)
399}
400
401/// Returns true if the logical table info needs to be updated.
402///
403/// The logical table only support to add columns, so we can check the length of column metadatas
404/// to determine whether the logical table info needs to be updated.
405pub(crate) fn need_update_logical_table_info(
406    table_info: &RawTableInfo,
407    column_metadatas: &[ColumnMetadata],
408) -> bool {
409    table_info.meta.schema.column_schemas.len() != column_metadatas.len()
410}
411
412/// The result of waiting for inflight subprocedures.
413pub struct PartialSuccessResult<'a> {
414    pub failed_procedures: Vec<&'a SubprocedureMeta>,
415    pub success_procedures: Vec<&'a SubprocedureMeta>,
416}
417
418/// The result of waiting for inflight subprocedures.
419pub enum WaitForInflightSubproceduresResult<'a> {
420    Success(Vec<&'a SubprocedureMeta>),
421    PartialSuccess(PartialSuccessResult<'a>),
422}
423
424/// Wait for inflight subprocedures.
425///
426/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
427/// Otherwise, the function will continue waiting for all subprocedures to complete.
428pub(crate) async fn wait_for_inflight_subprocedures<'a>(
429    procedure_ctx: &ProcedureContext,
430    subprocedures: &'a [SubprocedureMeta],
431    fail_fast: bool,
432) -> Result<WaitForInflightSubproceduresResult<'a>> {
433    let mut receivers = Vec::with_capacity(subprocedures.len());
434    for subprocedure in subprocedures {
435        let procedure_id = subprocedure.procedure_id();
436        let receiver = procedure_ctx
437            .provider
438            .procedure_state_receiver(procedure_id)
439            .await
440            .context(ProcedureStateReceiverSnafu { procedure_id })?
441            .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
442        receivers.push((receiver, subprocedure));
443    }
444
445    let mut tasks = Vec::with_capacity(receivers.len());
446    for (receiver, subprocedure) in receivers.iter_mut() {
447        tasks.push(async move {
448            watcher::wait(receiver).await.inspect_err(|e| {
449                error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
450            })
451        });
452    }
453
454    if fail_fast {
455        try_join_all(tasks).await.context(WaitProcedureSnafu)?;
456        return Ok(WaitForInflightSubproceduresResult::Success(
457            subprocedures.iter().collect(),
458        ));
459    }
460
461    // If fail_fast is false, we need to wait for all subprocedures to complete.
462    let results = join_all(tasks).await;
463    let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
464    if failed_procedures_num == 0 {
465        return Ok(WaitForInflightSubproceduresResult::Success(
466            subprocedures.iter().collect(),
467        ));
468    }
469    warn!(
470        "{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
471        failed_procedures_num,
472        subprocedures.len(),
473        procedure_ctx.procedure_id
474    );
475
476    let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
477    let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
478    for (result, subprocedure) in results.into_iter().zip(subprocedures) {
479        if result.is_err() {
480            failed_procedures.push(subprocedure);
481        } else {
482            success_procedures.push(subprocedure);
483        }
484    }
485
486    Ok(WaitForInflightSubproceduresResult::PartialSuccess(
487        PartialSuccessResult {
488            failed_procedures,
489            success_procedures,
490        },
491    ))
492}
493
494#[derive(Clone)]
495pub struct Context {
496    pub node_manager: NodeManagerRef,
497    pub table_metadata_manager: TableMetadataManagerRef,
498    pub cache_invalidator: CacheInvalidatorRef,
499}
500
501/// Metadata for an inflight physical table subprocedure.
502pub struct PhysicalTableMeta {
503    pub procedure_id: ProcedureId,
504    pub table_id: TableId,
505    pub table_name: TableName,
506}
507
508/// Metadata for an inflight logical table subprocedure.
509pub struct LogicalTableMeta {
510    pub procedure_id: ProcedureId,
511    pub physical_table_id: TableId,
512    pub physical_table_name: TableName,
513    pub logical_tables: Vec<(TableId, TableName)>,
514}
515
516/// Metadata for an inflight database subprocedure.
517pub struct ReconcileDatabaseMeta {
518    pub procedure_id: ProcedureId,
519    pub catalog: String,
520    pub schema: String,
521}
522
523/// The inflight subprocedure metadata.
524pub enum SubprocedureMeta {
525    PhysicalTable(PhysicalTableMeta),
526    LogicalTable(LogicalTableMeta),
527    Database(ReconcileDatabaseMeta),
528}
529
530impl Display for SubprocedureMeta {
531    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
532        match self {
533            SubprocedureMeta::PhysicalTable(meta) => {
534                write!(
535                    f,
536                    "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
537                    meta.procedure_id, meta.table_id, meta.table_name
538                )
539            }
540            SubprocedureMeta::LogicalTable(meta) => {
541                write!(
542                    f,
543                    "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
544                    meta.procedure_id,
545                    meta.physical_table_id,
546                    meta.physical_table_name,
547                    meta.logical_tables
548                )
549            }
550            SubprocedureMeta::Database(meta) => {
551                write!(
552                    f,
553                    "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
554                    meta.procedure_id, meta.catalog, meta.schema
555                )
556            }
557        }
558    }
559}
560
561impl SubprocedureMeta {
562    /// Creates a new logical table subprocedure metadata.
563    pub fn new_logical_table(
564        procedure_id: ProcedureId,
565        physical_table_id: TableId,
566        physical_table_name: TableName,
567        logical_tables: Vec<(TableId, TableName)>,
568    ) -> Self {
569        Self::LogicalTable(LogicalTableMeta {
570            procedure_id,
571            physical_table_id,
572            physical_table_name,
573            logical_tables,
574        })
575    }
576
577    /// Creates a new physical table subprocedure metadata.
578    pub fn new_physical_table(
579        procedure_id: ProcedureId,
580        table_id: TableId,
581        table_name: TableName,
582    ) -> Self {
583        Self::PhysicalTable(PhysicalTableMeta {
584            procedure_id,
585            table_id,
586            table_name,
587        })
588    }
589
590    /// Creates a new reconcile database subprocedure metadata.
591    pub fn new_reconcile_database(
592        procedure_id: ProcedureId,
593        catalog: String,
594        schema: String,
595    ) -> Self {
596        Self::Database(ReconcileDatabaseMeta {
597            procedure_id,
598            catalog,
599            schema,
600        })
601    }
602
603    /// Returns the procedure id of the subprocedure.
604    pub fn procedure_id(&self) -> ProcedureId {
605        match self {
606            SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
607            SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
608            SubprocedureMeta::Database(meta) => meta.procedure_id,
609        }
610    }
611
612    /// Returns the number of tables will be reconciled.
613    pub fn table_num(&self) -> usize {
614        match self {
615            SubprocedureMeta::PhysicalTable(_) => 1,
616            SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
617            SubprocedureMeta::Database(_) => 0,
618        }
619    }
620
621    /// Returns the number of databases will be reconciled.
622    pub fn database_num(&self) -> usize {
623        match self {
624            SubprocedureMeta::Database(_) => 1,
625            _ => 0,
626        }
627    }
628}
629
630/// The metrics of reconciling catalog.
631#[derive(Clone, Default)]
632pub struct ReconcileCatalogMetrics {
633    pub succeeded_databases: usize,
634    pub failed_databases: usize,
635}
636
637impl AddAssign for ReconcileCatalogMetrics {
638    fn add_assign(&mut self, other: Self) {
639        self.succeeded_databases += other.succeeded_databases;
640        self.failed_databases += other.failed_databases;
641    }
642}
643
644impl Display for ReconcileCatalogMetrics {
645    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
646        write!(
647            f,
648            "succeeded_databases: {}, failed_databases: {}",
649            self.succeeded_databases, self.failed_databases
650        )
651    }
652}
653
654impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
655    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
656        match result {
657            WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
658                succeeded_databases: subprocedures.len(),
659                failed_databases: 0,
660            },
661            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
662                failed_procedures,
663                success_procedures,
664            }) => {
665                let succeeded_databases = success_procedures
666                    .iter()
667                    .map(|subprocedure| subprocedure.database_num())
668                    .sum();
669                let failed_databases = failed_procedures
670                    .iter()
671                    .map(|subprocedure| subprocedure.database_num())
672                    .sum();
673                ReconcileCatalogMetrics {
674                    succeeded_databases,
675                    failed_databases,
676                }
677            }
678        }
679    }
680}
681
682/// The metrics of reconciling database.
683#[derive(Clone, Default)]
684pub struct ReconcileDatabaseMetrics {
685    pub succeeded_tables: usize,
686    pub failed_tables: usize,
687    pub succeeded_procedures: usize,
688    pub failed_procedures: usize,
689}
690
691impl Display for ReconcileDatabaseMetrics {
692    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693        write!(
694            f,
695            "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}",
696            self.succeeded_tables,
697            self.failed_tables,
698            self.succeeded_procedures,
699            self.failed_procedures
700        )
701    }
702}
703
704impl AddAssign for ReconcileDatabaseMetrics {
705    fn add_assign(&mut self, other: Self) {
706        self.succeeded_tables += other.succeeded_tables;
707        self.failed_tables += other.failed_tables;
708        self.succeeded_procedures += other.succeeded_procedures;
709        self.failed_procedures += other.failed_procedures;
710    }
711}
712
713impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
714    fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
715        match result {
716            WaitForInflightSubproceduresResult::Success(subprocedures) => {
717                let table_num = subprocedures
718                    .iter()
719                    .map(|subprocedure| subprocedure.table_num())
720                    .sum();
721                ReconcileDatabaseMetrics {
722                    succeeded_procedures: subprocedures.len(),
723                    failed_procedures: 0,
724                    succeeded_tables: table_num,
725                    failed_tables: 0,
726                }
727            }
728            WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
729                failed_procedures,
730                success_procedures,
731            }) => {
732                let succeeded_tables = success_procedures
733                    .iter()
734                    .map(|subprocedure| subprocedure.table_num())
735                    .sum();
736                let failed_tables = failed_procedures
737                    .iter()
738                    .map(|subprocedure| subprocedure.table_num())
739                    .sum();
740                ReconcileDatabaseMetrics {
741                    succeeded_procedures: success_procedures.len(),
742                    failed_procedures: failed_procedures.len(),
743                    succeeded_tables,
744                    failed_tables,
745                }
746            }
747        }
748    }
749}
750
751/// The metrics of reconciling logical tables.
752#[derive(Clone)]
753pub struct ReconcileLogicalTableMetrics {
754    pub start_time: Instant,
755    pub update_table_info_count: usize,
756    pub create_tables_count: usize,
757    pub column_metadata_consistent_count: usize,
758    pub column_metadata_inconsistent_count: usize,
759}
760
761impl Default for ReconcileLogicalTableMetrics {
762    fn default() -> Self {
763        Self {
764            start_time: Instant::now(),
765            update_table_info_count: 0,
766            create_tables_count: 0,
767            column_metadata_consistent_count: 0,
768            column_metadata_inconsistent_count: 0,
769        }
770    }
771}
772
773const CREATE_TABLES: &str = "create_tables";
774const UPDATE_TABLE_INFO: &str = "update_table_info";
775const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
776const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
777
778impl ReconcileLogicalTableMetrics {
779    /// The total number of tables that have been reconciled.
780    pub fn total_table_count(&self) -> usize {
781        self.create_tables_count
782            + self.column_metadata_consistent_count
783            + self.column_metadata_inconsistent_count
784    }
785}
786
787impl Drop for ReconcileLogicalTableMetrics {
788    fn drop(&mut self) {
789        let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
790        metrics::METRIC_META_RECONCILIATION_STATS
791            .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
792            .inc_by(self.create_tables_count as u64);
793        metrics::METRIC_META_RECONCILIATION_STATS
794            .with_label_values(&[
795                procedure_name,
796                metrics::TABLE_TYPE_LOGICAL,
797                UPDATE_TABLE_INFO,
798            ])
799            .inc_by(self.update_table_info_count as u64);
800        metrics::METRIC_META_RECONCILIATION_STATS
801            .with_label_values(&[
802                procedure_name,
803                metrics::TABLE_TYPE_LOGICAL,
804                COLUMN_METADATA_CONSISTENT,
805            ])
806            .inc_by(self.column_metadata_consistent_count as u64);
807        metrics::METRIC_META_RECONCILIATION_STATS
808            .with_label_values(&[
809                procedure_name,
810                metrics::TABLE_TYPE_LOGICAL,
811                COLUMN_METADATA_INCONSISTENT,
812            ])
813            .inc_by(self.column_metadata_inconsistent_count as u64);
814    }
815}
816
817impl Display for ReconcileLogicalTableMetrics {
818    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
819        let elapsed = self.start_time.elapsed();
820        if self.create_tables_count > 0 {
821            write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
822        }
823        if self.update_table_info_count > 0 {
824            write!(
825                f,
826                "update_table_info_count: {}, ",
827                self.update_table_info_count
828            )?;
829        }
830        if self.column_metadata_consistent_count > 0 {
831            write!(
832                f,
833                "column_metadata_consistent_count: {}, ",
834                self.column_metadata_consistent_count
835            )?;
836        }
837        if self.column_metadata_inconsistent_count > 0 {
838            write!(
839                f,
840                "column_metadata_inconsistent_count: {}, ",
841                self.column_metadata_inconsistent_count
842            )?;
843        }
844
845        write!(
846            f,
847            "total_table_count: {}, elapsed: {:?}",
848            self.total_table_count(),
849            elapsed
850        )
851    }
852}
853
854/// The result of resolving column metadata.
855#[derive(Clone, Copy)]
856pub enum ResolveColumnMetadataResult {
857    Consistent,
858    Inconsistent(ResolveStrategy),
859}
860
861impl Display for ResolveColumnMetadataResult {
862    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
863        match self {
864            ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
865            ResolveColumnMetadataResult::Inconsistent(strategy) => {
866                let strategy_str = strategy.as_ref();
867                write!(f, "Inconsistent({})", strategy_str)
868            }
869        }
870    }
871}
872
873/// The metrics of reconciling physical tables.
874#[derive(Clone)]
875pub struct ReconcileTableMetrics {
876    /// The start time of the reconciliation.
877    pub start_time: Instant,
878    /// The result of resolving column metadata.
879    pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
880    /// Whether the table info has been updated.
881    pub update_table_info: bool,
882}
883
884impl Drop for ReconcileTableMetrics {
885    fn drop(&mut self) {
886        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
887            match resolve_column_metadata_result {
888                ResolveColumnMetadataResult::Consistent => {
889                    metrics::METRIC_META_RECONCILIATION_STATS
890                        .with_label_values(&[
891                            ReconcileTableProcedure::TYPE_NAME,
892                            metrics::TABLE_TYPE_PHYSICAL,
893                            COLUMN_METADATA_CONSISTENT,
894                        ])
895                        .inc();
896                }
897                ResolveColumnMetadataResult::Inconsistent(strategy) => {
898                    metrics::METRIC_META_RECONCILIATION_STATS
899                        .with_label_values(&[
900                            ReconcileTableProcedure::TYPE_NAME,
901                            metrics::TABLE_TYPE_PHYSICAL,
902                            COLUMN_METADATA_INCONSISTENT,
903                        ])
904                        .inc();
905                    metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
906                        .with_label_values(&[strategy.as_ref()])
907                        .inc();
908                }
909            }
910        }
911        if self.update_table_info {
912            metrics::METRIC_META_RECONCILIATION_STATS
913                .with_label_values(&[
914                    ReconcileTableProcedure::TYPE_NAME,
915                    metrics::TABLE_TYPE_PHYSICAL,
916                    UPDATE_TABLE_INFO,
917                ])
918                .inc();
919        }
920    }
921}
922
923impl Default for ReconcileTableMetrics {
924    fn default() -> Self {
925        Self {
926            start_time: Instant::now(),
927            resolve_column_metadata_result: None,
928            update_table_info: false,
929        }
930    }
931}
932
933impl Display for ReconcileTableMetrics {
934    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
935        let elapsed = self.start_time.elapsed();
936        if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
937            write!(
938                f,
939                "resolve_column_metadata_result: {}, ",
940                resolve_column_metadata_result
941            )?;
942        }
943        write!(
944            f,
945            "update_table_info: {}, elapsed: {:?}",
946            self.update_table_info, elapsed
947        )
948    }
949}
950
951#[cfg(test)]
952mod tests {
953    use std::assert_matches::assert_matches;
954    use std::collections::HashMap;
955    use std::sync::Arc;
956
957    use api::v1::SemanticType;
958    use datatypes::prelude::ConcreteDataType;
959    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
960    use store_api::metadata::ColumnMetadata;
961    use store_api::storage::RegionId;
962    use table::metadata::{RawTableMeta, TableMetaBuilder};
963    use table::table_reference::TableReference;
964
965    use super::*;
966    use crate::ddl::test_util::region_metadata::build_region_metadata;
967    use crate::error::Error;
968    use crate::reconciliation::utils::check_column_metadatas_consistent;
969
970    fn new_test_schema() -> Schema {
971        let column_schemas = vec![
972            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
973            ColumnSchema::new(
974                "ts",
975                ConcreteDataType::timestamp_millisecond_datatype(),
976                false,
977            )
978            .with_time_index(true),
979            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
980        ];
981        SchemaBuilder::try_from(column_schemas)
982            .unwrap()
983            .version(123)
984            .build()
985            .unwrap()
986    }
987
988    fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
989        vec![
990            ColumnMetadata {
991                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
992                semantic_type: SemanticType::Tag,
993                column_id: 0,
994            },
995            ColumnMetadata {
996                column_schema: ColumnSchema::new(
997                    "ts",
998                    ConcreteDataType::timestamp_millisecond_datatype(),
999                    false,
1000                )
1001                .with_time_index(true),
1002                semantic_type: SemanticType::Timestamp,
1003                column_id: 1,
1004            },
1005            ColumnMetadata {
1006                column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
1007                semantic_type: SemanticType::Field,
1008                column_id: 2,
1009            },
1010        ]
1011    }
1012
1013    fn new_test_raw_table_info() -> RawTableMeta {
1014        let mut table_meta_builder = TableMetaBuilder::empty();
1015        let table_meta = table_meta_builder
1016            .schema(Arc::new(new_test_schema()))
1017            .primary_key_indices(vec![0])
1018            .partition_key_indices(vec![2])
1019            .next_column_id(4)
1020            .build()
1021            .unwrap();
1022
1023        table_meta.into()
1024    }
1025
1026    #[test]
1027    fn test_build_table_info_from_column_metadatas_identical() {
1028        let column_metadatas = new_test_column_metadatas();
1029        let table_id = 1;
1030        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1031        let mut table_meta = new_test_raw_table_info();
1032        table_meta.column_ids = vec![0, 1, 2];
1033        let name_to_ids = HashMap::from([
1034            ("col1".to_string(), 0),
1035            ("ts".to_string(), 1),
1036            ("col2".to_string(), 2),
1037        ]);
1038
1039        let new_table_meta = build_table_meta_from_column_metadatas(
1040            table_id,
1041            table_ref,
1042            &table_meta,
1043            Some(name_to_ids),
1044            &column_metadatas,
1045        )
1046        .unwrap();
1047        assert_eq!(new_table_meta, table_meta);
1048    }
1049
1050    #[test]
1051    fn test_build_table_info_from_column_metadatas() {
1052        let mut column_metadatas = new_test_column_metadatas();
1053        column_metadatas.push(ColumnMetadata {
1054            column_schema: ColumnSchema::new(
1055                "__table_id",
1056                ConcreteDataType::string_datatype(),
1057                true,
1058            ),
1059            semantic_type: SemanticType::Tag,
1060            column_id: ReservedColumnId::table_id(),
1061        });
1062
1063        let table_id = 1;
1064        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1065        let table_meta = new_test_raw_table_info();
1066        let name_to_ids = HashMap::from([
1067            ("col1".to_string(), 0),
1068            ("ts".to_string(), 1),
1069            ("col2".to_string(), 2),
1070        ]);
1071
1072        let new_table_meta = build_table_meta_from_column_metadatas(
1073            table_id,
1074            table_ref,
1075            &table_meta,
1076            Some(name_to_ids),
1077            &column_metadatas,
1078        )
1079        .unwrap();
1080
1081        assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
1082        assert_eq!(new_table_meta.partition_key_indices, vec![2]);
1083        assert_eq!(new_table_meta.value_indices, vec![1, 2]);
1084        assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
1085        assert_eq!(
1086            new_table_meta.column_ids,
1087            vec![0, 1, 2, ReservedColumnId::table_id()]
1088        );
1089        assert_eq!(new_table_meta.next_column_id, table_meta.next_column_id);
1090    }
1091
1092    #[test]
1093    fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
1094        let column_metadatas = new_test_column_metadatas();
1095        let table_id = 1;
1096        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1097        let table_meta = new_test_raw_table_info();
1098        let name_to_ids = HashMap::from([
1099            ("col1".to_string(), 0),
1100            ("ts".to_string(), 1),
1101            // Change column id of col2 to 3.
1102            ("col2".to_string(), 3),
1103        ]);
1104
1105        let err = build_table_meta_from_column_metadatas(
1106            table_id,
1107            table_ref,
1108            &table_meta,
1109            Some(name_to_ids),
1110            &column_metadatas,
1111        )
1112        .unwrap_err();
1113
1114        assert_matches!(err, Error::MismatchColumnId { .. });
1115    }
1116
1117    #[test]
1118    fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
1119        let mut column_metadatas = new_test_column_metadatas();
1120        column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1121        let table_id = 1;
1122        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1123        let table_meta = new_test_raw_table_info();
1124        let name_to_ids = HashMap::from([
1125            ("col1".to_string(), 0),
1126            ("ts".to_string(), 1),
1127            ("col2".to_string(), 2),
1128        ]);
1129
1130        let err = build_table_meta_from_column_metadatas(
1131            table_id,
1132            table_ref,
1133            &table_meta,
1134            Some(name_to_ids),
1135            &column_metadatas,
1136        )
1137        .unwrap_err();
1138
1139        assert!(
1140            err.to_string()
1141                .contains("Missing table index in column metadata"),
1142            "err: {}",
1143            err
1144        );
1145    }
1146
1147    #[test]
1148    fn test_build_table_info_from_column_metadatas_with_missing_column() {
1149        let mut column_metadatas = new_test_column_metadatas();
1150        // Remove primary key column.
1151        column_metadatas.retain(|c| c.column_id != 0);
1152        let table_id = 1;
1153        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
1154        let table_meta = new_test_raw_table_info();
1155        let name_to_ids = HashMap::from([
1156            ("col1".to_string(), 0),
1157            ("ts".to_string(), 1),
1158            ("col2".to_string(), 2),
1159        ]);
1160
1161        let err = build_table_meta_from_column_metadatas(
1162            table_id,
1163            table_ref,
1164            &table_meta,
1165            Some(name_to_ids.clone()),
1166            &column_metadatas,
1167        )
1168        .unwrap_err();
1169        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1170
1171        let mut column_metadatas = new_test_column_metadatas();
1172        // Remove partition key column.
1173        column_metadatas.retain(|c| c.column_id != 2);
1174
1175        let err = build_table_meta_from_column_metadatas(
1176            table_id,
1177            table_ref,
1178            &table_meta,
1179            Some(name_to_ids),
1180            &column_metadatas,
1181        )
1182        .unwrap_err();
1183        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
1184    }
1185
1186    #[test]
1187    fn test_check_column_metadatas_consistent() {
1188        let column_metadatas = new_test_column_metadatas();
1189        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1190        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1191        let result =
1192            check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
1193        assert_eq!(result, column_metadatas);
1194
1195        let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
1196        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
1197        let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
1198        assert!(result.is_none());
1199    }
1200
1201    #[test]
1202    fn test_check_column_metadata_invariants() {
1203        let column_metadatas = new_test_column_metadatas();
1204        let mut new_column_metadatas = column_metadatas.clone();
1205        new_column_metadatas.push(ColumnMetadata {
1206            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1207            semantic_type: SemanticType::Field,
1208            column_id: 3,
1209        });
1210        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
1211    }
1212
1213    #[test]
1214    fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
1215        let column_metadatas = new_test_column_metadatas();
1216        let mut new_column_metadatas = column_metadatas.clone();
1217        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
1218        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1219
1220        let column_metadatas = new_test_column_metadatas();
1221        let mut new_column_metadatas = column_metadatas.clone();
1222        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
1223        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1224    }
1225
1226    #[test]
1227    fn test_check_column_metadata_invariants_mismatch_column_id() {
1228        let column_metadatas = new_test_column_metadatas();
1229        let mut new_column_metadatas = column_metadatas.clone();
1230        if let Some(col) = new_column_metadatas
1231            .iter_mut()
1232            .find(|c| c.semantic_type == SemanticType::Timestamp)
1233        {
1234            col.column_id = 100;
1235        }
1236        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1237
1238        let column_metadatas = new_test_column_metadatas();
1239        let mut new_column_metadatas = column_metadatas.clone();
1240        if let Some(col) = new_column_metadatas
1241            .iter_mut()
1242            .find(|c| c.semantic_type == SemanticType::Tag)
1243        {
1244            col.column_id = 100;
1245        }
1246        check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
1247    }
1248
1249    #[test]
1250    fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
1251        let column_metadatas = new_test_column_metadatas();
1252        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1253        let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
1254        metasrv_column_metadatas.push(ColumnMetadata {
1255            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1256            semantic_type: SemanticType::Field,
1257            column_id: 3,
1258        });
1259        let result =
1260            resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
1261                .unwrap();
1262
1263        assert_eq!(result, vec![RegionId::new(1024, 0)]);
1264    }
1265
1266    #[test]
1267    fn test_resolve_column_metadatas_with_use_latest_strategy() {
1268        let column_metadatas = new_test_column_metadatas();
1269        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
1270        let mut new_column_metadatas = column_metadatas.clone();
1271        new_column_metadatas.push(ColumnMetadata {
1272            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
1273            semantic_type: SemanticType::Field,
1274            column_id: 3,
1275        });
1276
1277        let mut region_metadata2 =
1278            build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
1279        region_metadata2.schema_version = 2;
1280
1281        let (resolved_column_metadatas, region_ids) =
1282            resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
1283        assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
1284        assert_eq!(resolved_column_metadatas, new_column_metadatas);
1285    }
1286}