common_meta/reconciliation/reconcile_logical_tables/
reconciliation_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt};
21use store_api::storage::TableId;
22use table::table_name::TableName;
23
24use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
25use crate::ddl::utils::table_id::get_all_table_ids_by_names;
26use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id;
27use crate::error::{self, Result};
28use crate::metrics;
29use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas;
30use crate::reconciliation::reconcile_logical_tables::{
31    ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
32};
33use crate::reconciliation::utils::check_column_metadatas_consistent;
34
35/// The start state of the reconciliation procedure.
36#[derive(Debug, Serialize, Deserialize)]
37pub struct ReconciliationStart;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for ReconciliationStart {
42    async fn next(
43        &mut self,
44        ctx: &mut ReconcileLogicalTablesContext,
45        procedure_ctx: &ProcedureContext,
46    ) -> Result<(Box<dyn State>, Status)> {
47        let table_id = ctx.table_id();
48        let table_name = ctx.table_name();
49        let (physical_table_id, physical_table_route) = ctx
50            .table_metadata_manager
51            .table_route_manager()
52            .get_physical_table_route(table_id)
53            .await?;
54        ensure!(
55            physical_table_id == table_id,
56            error::UnexpectedSnafu {
57                err_msg: format!(
58                    "Expected physical table: {}, but it's a logical table of table: {}",
59                    table_name, physical_table_id
60                ),
61            }
62        );
63
64        let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
65        let region_metadatas = {
66            let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
67                .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
68                .start_timer();
69            region_metadata_lister
70                .list(physical_table_id, &physical_table_route.region_routes)
71                .await?
72        };
73
74        ensure!(!region_metadatas.is_empty(), {
75            metrics::METRIC_META_RECONCILIATION_STATS
76                .with_label_values(&[
77                    ReconcileLogicalTablesProcedure::TYPE_NAME,
78                    metrics::TABLE_TYPE_PHYSICAL,
79                    metrics::STATS_TYPE_NO_REGION_METADATA,
80                ])
81                .inc();
82
83            error::UnexpectedSnafu {
84                err_msg: format!(
85                    "No region metadata found for physical table: {}, table_id: {}",
86                    table_name, table_id
87                ),
88            }
89        });
90
91        ensure!(region_metadatas.iter().all(|r| r.is_some()), {
92            metrics::METRIC_META_RECONCILIATION_STATS
93                .with_label_values(&[
94                    ReconcileLogicalTablesProcedure::TYPE_NAME,
95                    metrics::TABLE_TYPE_PHYSICAL,
96                    metrics::STATS_TYPE_REGION_NOT_OPEN,
97                ])
98                .inc();
99            error::UnexpectedSnafu {
100                err_msg: format!(
101                    "Some regions of the physical table are not open. physical table: {}, table_id: {}",
102                    table_name, table_id
103                ),
104            }
105        });
106
107        // Safety: checked above
108        let region_metadatas = region_metadatas
109            .into_iter()
110            .map(|r| r.unwrap())
111            .collect::<Vec<_>>();
112        let _region_metadata = check_column_metadatas_consistent(&region_metadatas).context(
113            error::UnexpectedSnafu {
114                err_msg: format!(
115                    "Column metadatas are not consistent for physical table: {}, table_id: {}",
116                    table_name, table_id
117                ),
118            },
119        )?;
120
121        // TODO(weny): ensure all columns in region metadata can be found in table info.
122        // Validates the logical tables.
123        Self::validate_schema(&ctx.persistent_ctx.logical_tables)?;
124        let table_refs = ctx
125            .persistent_ctx
126            .logical_tables
127            .iter()
128            .map(|t| t.table_ref())
129            .collect::<Vec<_>>();
130        let table_ids = get_all_table_ids_by_names(
131            ctx.table_metadata_manager.table_name_manager(),
132            &table_refs,
133        )
134        .await?;
135        Self::validate_logical_table_routes(ctx, &table_ids).await?;
136
137        let table_name = ctx.table_name();
138        info!(
139            "Starting reconciliation for logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}",
140            table_ids, table_id, table_name, procedure_ctx.procedure_id
141        );
142
143        ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
144        ctx.persistent_ctx.logical_table_ids = table_ids;
145        Ok((Box::new(ResolveTableMetadatas), Status::executing(true)))
146    }
147
148    fn as_any(&self) -> &dyn Any {
149        self
150    }
151}
152
153impl ReconciliationStart {
154    /// Validates all the logical tables have the same catalog and schema.
155    fn validate_schema(logical_tables: &[TableName]) -> Result<()> {
156        let is_same_schema = logical_tables.windows(2).all(|pair| {
157            pair[0].catalog_name == pair[1].catalog_name
158                && pair[0].schema_name == pair[1].schema_name
159        });
160
161        ensure!(
162            is_same_schema,
163            error::UnexpectedSnafu {
164                err_msg: "The logical tables have different schemas",
165            }
166        );
167
168        Ok(())
169    }
170
171    async fn validate_logical_table_routes(
172        ctx: &mut ReconcileLogicalTablesContext,
173        table_ids: &[TableId],
174    ) -> Result<()> {
175        let all_logical_table_routes_have_same_physical_id =
176            all_logical_table_routes_have_same_physical_id(
177                ctx.table_metadata_manager.table_route_manager(),
178                table_ids,
179                ctx.table_id(),
180            )
181            .await?;
182
183        ensure!(
184            all_logical_table_routes_have_same_physical_id,
185            error::UnexpectedSnafu {
186                err_msg: "All the logical tables should have the same physical table id",
187            }
188        );
189
190        Ok(())
191    }
192}