common_meta/reconciliation/reconcile_table/
reconciliation_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::ensure;
21
22use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
23use crate::error::{self, Result};
24use crate::metrics::{self};
25use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata;
26use crate::reconciliation::reconcile_table::{
27    ReconcileTableContext, ReconcileTableProcedure, State,
28};
29
30/// The start state of the reconciliation procedure.
31///
32/// This state is used to prepare the table for reconciliation.
33/// It will:
34/// 1. Check the table id and table name consistency.
35/// 2. Ensures the table is a physical table.
36/// 3. List the region metadatas for the physical table.
37#[derive(Debug, Serialize, Deserialize)]
38pub struct ReconciliationStart;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for ReconciliationStart {
43    async fn next(
44        &mut self,
45        ctx: &mut ReconcileTableContext,
46        procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let table_id = ctx.table_id();
49        let table_name = ctx.table_name();
50
51        let (physical_table_id, physical_table_route) = ctx
52            .table_metadata_manager
53            .table_route_manager()
54            .get_physical_table_route(table_id)
55            .await?;
56        ensure!(
57            physical_table_id == table_id,
58            error::UnexpectedSnafu {
59                err_msg: format!(
60                    "Reconcile table only works for physical table, but got logical table: {}, table_id: {}",
61                    table_name, table_id
62                ),
63            }
64        );
65
66        info!(
67            "Reconciling table: {}, table_id: {}, procedure_id: {}",
68            table_name, table_id, procedure_ctx.procedure_id
69        );
70        // TODO(weny): Repairs the table route if needed.
71        let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
72
73        let region_metadatas = {
74            let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
75                .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
76                .start_timer();
77            // Always list region metadatas for the physical table.
78            region_metadata_lister
79                .list(physical_table_id, &physical_table_route.region_routes)
80                .await?
81        };
82
83        ensure!(!region_metadatas.is_empty(), {
84            metrics::METRIC_META_RECONCILIATION_STATS
85                .with_label_values(&[
86                    ReconcileTableProcedure::TYPE_NAME,
87                    metrics::TABLE_TYPE_PHYSICAL,
88                    metrics::STATS_TYPE_NO_REGION_METADATA,
89                ])
90                .inc();
91
92            error::UnexpectedSnafu {
93                err_msg: format!(
94                    "No region metadata found for table: {}, table_id: {}",
95                    table_name, table_id
96                ),
97            }
98        });
99
100        ensure!(region_metadatas.iter().all(|r| r.is_some()), {
101            metrics::METRIC_META_RECONCILIATION_STATS
102                .with_label_values(&[
103                    ReconcileTableProcedure::TYPE_NAME,
104                    metrics::TABLE_TYPE_PHYSICAL,
105                    metrics::STATS_TYPE_REGION_NOT_OPEN,
106                ])
107                .inc();
108
109            error::UnexpectedSnafu {
110                err_msg: format!(
111                    "Some regions are not opened, table: {}, table_id: {}",
112                    table_name, table_id
113                ),
114            }
115        });
116
117        // Persist the physical table route.
118        // TODO(weny): refetch the physical table route if repair is needed.
119        ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
120        let region_metadatas = region_metadatas.into_iter().map(|r| r.unwrap()).collect();
121        Ok((
122            Box::new(ResolveColumnMetadata::new(
123                ctx.persistent_ctx.resolve_strategy,
124                region_metadatas,
125            )),
126            // We don't persist the state of this step.
127            Status::executing(false),
128        ))
129    }
130
131    fn as_any(&self) -> &dyn Any {
132        self
133    }
134}