common_meta/reconciliation/reconcile_table/
reconcile_regions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use api::v1::column_def::try_as_column_def;
19use api::v1::region::region_request::Body;
20use api::v1::region::{
21    alter_request, AlterRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, SyncColumns,
22};
23use api::v1::{ColumnDef, SemanticType};
24use async_trait::async_trait;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::info;
27use common_telemetry::tracing_context::TracingContext;
28use futures::future;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
33use store_api::storage::{ColumnId, RegionId};
34
35use crate::ddl::utils::{add_peer_context_if_needed, extract_column_metadatas};
36use crate::error::{ConvertColumnDefSnafu, Result, UnexpectedSnafu};
37use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
38use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
39use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
40use crate::rpc::router::{find_leaders, region_distribution};
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct ReconcileRegions {
44    column_metadatas: Vec<ColumnMetadata>,
45    region_ids: HashSet<RegionId>,
46}
47
48impl ReconcileRegions {
49    pub fn new(column_metadatas: Vec<ColumnMetadata>, region_ids: Vec<RegionId>) -> Self {
50        Self {
51            column_metadatas,
52            region_ids: region_ids.into_iter().collect(),
53        }
54    }
55}
56
57#[async_trait]
58#[typetag::serde]
59impl State for ReconcileRegions {
60    async fn next(
61        &mut self,
62        ctx: &mut ReconcileTableContext,
63        _procedure_ctx: &ProcedureContext,
64    ) -> Result<(Box<dyn State>, Status)> {
65        let table_meta = ctx.build_table_meta(&self.column_metadatas)?;
66        ctx.volatile_ctx.table_meta = Some(table_meta);
67        let table_id = ctx.table_id();
68        let table_name = ctx.table_name();
69
70        let primary_keys = self
71            .column_metadatas
72            .iter()
73            .filter(|c| c.semantic_type == SemanticType::Tag)
74            .map(|c| c.column_schema.name.to_string())
75            .collect::<HashSet<_>>();
76        let column_defs = self
77            .column_metadatas
78            .iter()
79            .map(|c| {
80                let column_def = try_as_column_def(
81                    &c.column_schema,
82                    primary_keys.contains(&c.column_schema.name),
83                )
84                .context(ConvertColumnDefSnafu {
85                    column: &c.column_schema.name,
86                })?;
87
88                Ok((c.column_id, column_def))
89            })
90            .collect::<Result<Vec<_>>>()?;
91
92        // Sends sync column metadatas to datanode.
93        // Safety: The physical table route is set in `ReconciliationStart` state.
94        let region_routes = &ctx
95            .persistent_ctx
96            .physical_table_route
97            .as_ref()
98            .unwrap()
99            .region_routes;
100        let region_distribution = region_distribution(region_routes);
101        let leaders = find_leaders(region_routes)
102            .into_iter()
103            .map(|p| (p.id, p))
104            .collect::<HashMap<_, _>>();
105        let mut sync_column_tsks = Vec::with_capacity(self.region_ids.len());
106        for (datanode_id, region_role_set) in region_distribution {
107            if region_role_set.leader_regions.is_empty() {
108                continue;
109            }
110            // Safety: It contains all leaders in the region routes.
111            let peer = leaders.get(&datanode_id).unwrap();
112            for region_id in region_role_set.leader_regions {
113                let region_id = RegionId::new(ctx.persistent_ctx.table_id, region_id);
114                if self.region_ids.contains(&region_id) {
115                    let requester = ctx.node_manager.datanode(peer).await;
116                    let request = make_alter_region_request(region_id, &column_defs);
117                    let peer = peer.clone();
118
119                    sync_column_tsks.push(async move {
120                        requester
121                            .handle(request)
122                            .await
123                            .map_err(add_peer_context_if_needed(peer))
124                    });
125                }
126            }
127        }
128
129        let mut results = future::join_all(sync_column_tsks)
130            .await
131            .into_iter()
132            .collect::<Result<Vec<_>>>()?;
133
134        // Ensures all the column metadatas are the same.
135        let column_metadatas =
136            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?.context(
137                UnexpectedSnafu {
138                    err_msg: format!(
139                        "The table column metadata schemas from datanodes are not the same, table: {}, table_id: {}",
140                        table_name,
141                        table_id
142                    ),
143                },
144            )?;
145
146        // Checks all column metadatas are consistent, and updates the table info if needed.
147        if column_metadatas != self.column_metadatas {
148            info!("Datanode column metadatas are not consistent with metasrv, updating metasrv's column metadatas, table: {}, table_id: {}", table_name, table_id);
149            // Safety: fetched in the above.
150            let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
151            return Ok((
152                Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
153                Status::executing(true),
154            ));
155        }
156
157        Ok((Box::new(ReconciliationEnd), Status::executing(false)))
158    }
159
160    fn as_any(&self) -> &dyn Any {
161        self
162    }
163}
164
165/// Makes an alter region request to sync columns.
166fn make_alter_region_request(
167    region_id: RegionId,
168    column_defs: &[(ColumnId, ColumnDef)],
169) -> RegionRequest {
170    let kind = alter_request::Kind::SyncColumns(to_region_sync_columns(column_defs));
171
172    let alter_request = AlterRequest {
173        region_id: region_id.as_u64(),
174        schema_version: 0,
175        kind: Some(kind),
176    };
177
178    RegionRequest {
179        header: Some(RegionRequestHeader {
180            tracing_context: TracingContext::from_current_span().to_w3c(),
181            ..Default::default()
182        }),
183        body: Some(Body::Alter(alter_request)),
184    }
185}
186
187fn to_region_sync_columns(column_defs: &[(ColumnId, ColumnDef)]) -> SyncColumns {
188    let region_column_defs = column_defs
189        .iter()
190        .map(|(column_id, column_def)| RegionColumnDef {
191            column_id: *column_id,
192            column_def: Some(column_def.clone()),
193        })
194        .collect::<Vec<_>>();
195
196    SyncColumns {
197        column_defs: region_column_defs,
198    }
199}