common_meta/reconciliation/reconcile_logical_tables/
reconcile_regions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use common_telemetry::tracing_context::TracingContext;
22use futures::future;
23use serde::{Deserialize, Serialize};
24use store_api::storage::{RegionId, RegionNumber, TableId};
25use table::metadata::RawTableInfo;
26
27use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path};
28use crate::ddl::{build_template_from_raw_table_info, CreateRequestBuilder};
29use crate::error::Result;
30use crate::reconciliation::reconcile_logical_tables::update_table_infos::UpdateTableInfos;
31use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
32use crate::rpc::router::{find_leaders, region_distribution};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub struct ReconcileRegions;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ReconcileRegions {
40    async fn next(
41        &mut self,
42        ctx: &mut ReconcileLogicalTablesContext,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        if ctx.persistent_ctx.create_tables.is_empty() {
46            return Ok((Box::new(UpdateTableInfos), Status::executing(false)));
47        }
48
49        // Safety: previous steps ensure the physical table route is set.
50        let region_routes = &ctx
51            .persistent_ctx
52            .physical_table_route
53            .as_ref()
54            .unwrap()
55            .region_routes;
56
57        let region_distribution = region_distribution(region_routes);
58        let leaders = find_leaders(region_routes)
59            .into_iter()
60            .map(|p| (p.id, p))
61            .collect::<HashMap<_, _>>();
62        let mut create_table_tasks = Vec::with_capacity(leaders.len());
63        for (datanode_id, region_role_set) in region_distribution {
64            if region_role_set.leader_regions.is_empty() {
65                continue;
66            }
67            // Safety: It contains all leaders in the region routes.
68            let peer = leaders.get(&datanode_id).unwrap().clone();
69            let request = self.make_request(&region_role_set.leader_regions, ctx)?;
70            let requester = ctx.node_manager.datanode(&peer).await;
71            create_table_tasks.push(async move {
72                requester
73                    .handle(request)
74                    .await
75                    .map_err(add_peer_context_if_needed(peer))
76            });
77        }
78
79        future::join_all(create_table_tasks)
80            .await
81            .into_iter()
82            .collect::<Result<Vec<_>>>()?;
83        let table_id = ctx.table_id();
84        let table_name = ctx.table_name();
85        info!(
86            "Reconciled regions for logical tables: {:?}, physical table: {}, table_id: {}",
87            ctx.persistent_ctx
88                .create_tables
89                .iter()
90                .map(|(table_id, _)| table_id)
91                .collect::<Vec<_>>(),
92            table_id,
93            table_name
94        );
95        ctx.persistent_ctx.create_tables.clear();
96        return Ok((Box::new(UpdateTableInfos), Status::executing(true)));
97    }
98
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102}
103
104impl ReconcileRegions {
105    fn make_request(
106        &self,
107        region_numbers: &[u32],
108        ctx: &ReconcileLogicalTablesContext,
109    ) -> Result<RegionRequest> {
110        let physical_table_id = ctx.table_id();
111        let table_name = ctx.table_name();
112        let create_tables = &ctx.persistent_ctx.create_tables;
113
114        let mut requests = Vec::with_capacity(region_numbers.len() * create_tables.len());
115
116        for (table_id, table_info) in create_tables {
117            let request_builder =
118                create_region_request_from_raw_table_info(table_info, physical_table_id)?;
119            let storage_path =
120                region_storage_path(&table_name.catalog_name, &table_name.schema_name);
121            let partition_exprs = prepare_partition_exprs(ctx, *table_id);
122
123            for region_number in region_numbers {
124                let region_id = RegionId::new(*table_id, *region_number);
125
126                let one_region_request = request_builder.build_one(
127                    region_id,
128                    storage_path.clone(),
129                    &HashMap::new(),
130                    &partition_exprs,
131                );
132                requests.push(one_region_request);
133            }
134        }
135
136        Ok(RegionRequest {
137            header: Some(RegionRequestHeader {
138                tracing_context: TracingContext::from_current_span().to_w3c(),
139                ..Default::default()
140            }),
141            body: Some(region_request::Body::Creates(CreateRequests { requests })),
142        })
143    }
144}
145
146/// Creates a region request builder from a raw table info.
147fn create_region_request_from_raw_table_info(
148    raw_table_info: &RawTableInfo,
149    physical_table_id: TableId,
150) -> Result<CreateRequestBuilder> {
151    let template = build_template_from_raw_table_info(raw_table_info)?;
152    Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
153}
154
155fn prepare_partition_exprs(
156    ctx: &ReconcileLogicalTablesContext,
157    table_id: TableId,
158) -> HashMap<RegionNumber, String> {
159    ctx.persistent_ctx
160        .physical_table_route
161        .as_ref()
162        .map(|r| {
163            r.region_routes
164                .iter()
165                .filter(|r| r.region.id.table_id() == table_id)
166                .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
167                .collect::<HashMap<_, _>>()
168        })
169        .unwrap_or_default()
170}