common_meta/reconciliation/reconcile_logical_tables/
resolve_table_metadatas.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::{info, warn};
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
23use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
24use crate::error::{self, Result};
25use crate::metrics;
26use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions;
27use crate::reconciliation::reconcile_logical_tables::{
28    ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
29};
30use crate::reconciliation::utils::{
31    check_column_metadatas_consistent, need_update_logical_table_info,
32};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub struct ResolveTableMetadatas;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ResolveTableMetadatas {
40    async fn next(
41        &mut self,
42        ctx: &mut ReconcileLogicalTablesContext,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        let table_names = ctx
46            .persistent_ctx
47            .logical_tables
48            .iter()
49            .map(|t| t.table_ref())
50            .collect::<Vec<_>>();
51        let table_ids = &ctx.persistent_ctx.logical_table_ids;
52
53        let mut create_tables = vec![];
54        let mut update_table_infos = vec![];
55
56        let table_info_values = get_all_table_info_values_by_table_ids(
57            ctx.table_metadata_manager.table_info_manager(),
58            table_ids,
59            &table_names,
60        )
61        .await?;
62
63        // Safety: The physical table route is set in `ReconciliationStart` state.
64        let region_routes = &ctx
65            .persistent_ctx
66            .physical_table_route
67            .as_ref()
68            .unwrap()
69            .region_routes;
70        let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
71        let mut metadata_consistent_count = 0;
72        let mut metadata_inconsistent_count = 0;
73        let mut create_tables_count = 0;
74        for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) {
75            let region_metadatas = {
76                let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
77                    .with_label_values(&[metrics::TABLE_TYPE_LOGICAL])
78                    .start_timer();
79                region_metadata_lister
80                    .list(*table_id, region_routes)
81                    .await?
82            };
83
84            ensure!(!region_metadatas.is_empty(), {
85                metrics::METRIC_META_RECONCILIATION_STATS
86                    .with_label_values(&[
87                        ReconcileLogicalTablesProcedure::TYPE_NAME,
88                        metrics::TABLE_TYPE_LOGICAL,
89                        metrics::STATS_TYPE_NO_REGION_METADATA,
90                    ])
91                    .inc();
92
93                error::UnexpectedSnafu {
94                    err_msg: format!(
95                        "No region metadata found for table: {}, table_id: {}",
96                        table_info_value.table_info.name, table_id
97                    ),
98                }
99            });
100
101            if region_metadatas.iter().any(|r| r.is_none()) {
102                create_tables_count += 1;
103                create_tables.push((*table_id, table_info_value.table_info.clone()));
104                continue;
105            }
106
107            // Safety: The physical table route is set in `ReconciliationStart` state.
108            let region_metadatas = region_metadatas
109                .into_iter()
110                .map(|r| r.unwrap())
111                .collect::<Vec<_>>();
112            if let Some(column_metadatas) = check_column_metadatas_consistent(&region_metadatas) {
113                metadata_consistent_count += 1;
114                if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) {
115                    update_table_infos.push((*table_id, column_metadatas));
116                }
117            } else {
118                metadata_inconsistent_count += 1;
119                // If the logical regions have inconsistent column metadatas, it won't affect read and write.
120                // It's safe to continue if the column metadatas of the logical table are inconsistent.
121                warn!(
122                    "Found inconsistent column metadatas for table: {}, table_id: {}. Remaining the inconsistent column metadatas",
123                    table_info_value.table_info.name, table_id
124                );
125            }
126        }
127
128        let table_id = ctx.table_id();
129        let table_name = ctx.table_name();
130        info!(
131            "Resolving table metadatas for physical table: {}, table_id: {}, updating table infos: {:?}, creating tables: {:?}",
132            table_name,
133            table_id,
134            update_table_infos
135                .iter()
136                .map(|(table_id, _)| *table_id)
137                .collect::<Vec<_>>(),
138            create_tables
139                .iter()
140                .map(|(table_id, _)| *table_id)
141                .collect::<Vec<_>>(),
142        );
143        ctx.persistent_ctx.update_table_infos = update_table_infos;
144        ctx.persistent_ctx.create_tables = create_tables;
145        // Update metrics.
146        let metrics = ctx.mut_metrics();
147        metrics.column_metadata_consistent_count = metadata_consistent_count;
148        metrics.column_metadata_inconsistent_count = metadata_inconsistent_count;
149        metrics.create_tables_count = create_tables_count;
150        Ok((Box::new(ReconcileRegions), Status::executing(true)))
151    }
152
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156}