meta_srv/procedure/repartition/group/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::sync_region::SyncRegion;
26use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
27use crate::procedure::repartition::group::{
28    Context, GroupId, GroupPrepareResult, State, region_routes,
29};
30use crate::procedure::repartition::plan::RegionDescriptor;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct RepartitionStart;
34
35/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
36fn ensure_region_route_expr_match(
37    region_route: &RegionRoute,
38    region_descriptor: &RegionDescriptor,
39) -> Result<RegionRoute> {
40    let actual = &region_route.region.partition_expr;
41    let expected = region_descriptor
42        .partition_expr
43        .as_json_str()
44        .context(error::SerializePartitionExprSnafu)?;
45    ensure!(
46        actual == &expected,
47        error::PartitionExprMismatchSnafu {
48            region_id: region_route.region.id,
49            expected,
50            actual,
51        }
52    );
53    Ok(region_route.clone())
54}
55
56impl RepartitionStart {
57    /// Ensures that both source and target regions are present in the region routes.
58    ///
59    /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
60    fn ensure_route_present(
61        group_id: GroupId,
62        region_routes: &[RegionRoute],
63        sources: &[RegionDescriptor],
64        targets: &[RegionDescriptor],
65    ) -> Result<GroupPrepareResult> {
66        ensure!(
67            !sources.is_empty(),
68            error::UnexpectedSnafu {
69                violated: "Sources are empty"
70            }
71        );
72
73        let region_routes_map = region_routes
74            .iter()
75            .map(|r| (r.region.id, r))
76            .collect::<HashMap<_, _>>();
77        let source_region_routes = sources
78            .iter()
79            .map(|s| {
80                region_routes_map
81                    .get(&s.region_id)
82                    .context(error::RepartitionSourceRegionMissingSnafu {
83                        group_id,
84                        region_id: s.region_id,
85                    })
86                    .and_then(|r| ensure_region_route_expr_match(r, s))
87            })
88            .collect::<Result<Vec<_>>>()?;
89        let target_region_routes = targets
90            .iter()
91            .map(|t| {
92                region_routes_map
93                    .get(&t.region_id)
94                    .context(error::RepartitionTargetRegionMissingSnafu {
95                        group_id,
96                        region_id: t.region_id,
97                    })
98                    .map(|r| (*r).clone())
99            })
100            .collect::<Result<Vec<_>>>()?;
101        for target_region_route in &target_region_routes {
102            ensure!(
103                target_region_route.leader_peer.is_some(),
104                error::UnexpectedSnafu {
105                    violated: format!(
106                        "Leader peer is not set for region: {}",
107                        target_region_route.region.id
108                    ),
109                }
110            );
111        }
112        let central_region = sources[0].region_id;
113        let central_region_datanode = source_region_routes[0]
114            .leader_peer
115            .as_ref()
116            .context(error::UnexpectedSnafu {
117                violated: format!(
118                    "Leader peer is not set for central region: {}",
119                    central_region
120                ),
121            })?
122            .clone();
123
124        Ok(GroupPrepareResult {
125            source_routes: source_region_routes,
126            target_routes: target_region_routes,
127            central_region,
128            central_region_datanode,
129        })
130    }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde]
135impl State for RepartitionStart {
136    /// Captures the group prepare result.
137    ///
138    /// Retry:
139    /// - Failed to get the table route.
140    ///
141    /// Abort
142    /// - Table route not found.
143    /// - Table route is not physical.
144    /// - Failed to ensure the route is present.
145    /// - Failed to capture the group prepare result.
146    async fn next(
147        &mut self,
148        ctx: &mut Context,
149        _procedure_ctx: &ProcedureContext,
150    ) -> Result<(Box<dyn State>, Status)> {
151        if ctx.persistent_ctx.group_prepare_result.is_some() {
152            return Ok((
153                Box::new(UpdateMetadata::ApplyStaging),
154                Status::executing(true),
155            ));
156        }
157        let table_id = ctx.persistent_ctx.table_id;
158        let group_id = ctx.persistent_ctx.group_id;
159        let table_route_value = ctx.get_table_route_value().await?.into_inner();
160        let region_routes = region_routes(table_id, &table_route_value)?;
161        let group_prepare_result = Self::ensure_route_present(
162            group_id,
163            region_routes,
164            &ctx.persistent_ctx.sources,
165            &ctx.persistent_ctx.targets,
166        )?;
167        ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
168        debug!(
169            "Repartition group {}: captured {} sources, {} targets",
170            group_id,
171            ctx.persistent_ctx.sources.len(),
172            ctx.persistent_ctx.targets.len()
173        );
174
175        if ctx.persistent_ctx.sync_region {
176            let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
177            let allocated_region_ids: HashSet<_> = ctx
178                .persistent_ctx
179                .allocated_region_ids
180                .iter()
181                .copied()
182                .collect();
183            let region_routes: Vec<_> = prepare_result
184                .target_routes
185                .iter()
186                .filter(|route| allocated_region_ids.contains(&route.region.id))
187                .cloned()
188                .collect();
189            if !region_routes.is_empty() {
190                return Ok((
191                    Box::new(SyncRegion { region_routes }),
192                    Status::executing(true),
193                ));
194            }
195        }
196
197        Ok((
198            Box::new(UpdateMetadata::ApplyStaging),
199            Status::executing(true),
200        ))
201    }
202
203    fn as_any(&self) -> &dyn Any {
204        self
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::assert_matches::assert_matches;
211
212    use common_meta::peer::Peer;
213    use common_meta::rpc::router::{Region, RegionRoute};
214    use store_api::storage::RegionId;
215    use uuid::Uuid;
216
217    use crate::error::Error;
218    use crate::procedure::repartition::group::repartition_start::RepartitionStart;
219    use crate::procedure::repartition::plan::RegionDescriptor;
220    use crate::procedure::repartition::test_util::range_expr;
221
222    #[test]
223    fn test_ensure_route_present_missing_source_region() {
224        let source_region = RegionDescriptor {
225            region_id: RegionId::new(1024, 1),
226            partition_expr: range_expr("x", 0, 100),
227        };
228        let target_region = RegionDescriptor {
229            region_id: RegionId::new(1024, 2),
230            partition_expr: range_expr("x", 0, 10),
231        };
232        let region_routes = vec![RegionRoute {
233            region: Region {
234                id: RegionId::new(1024, 2),
235                ..Default::default()
236            },
237            leader_peer: Some(Peer::empty(1)),
238            ..Default::default()
239        }];
240        let err = RepartitionStart::ensure_route_present(
241            Uuid::new_v4(),
242            &region_routes,
243            &[source_region],
244            &[target_region],
245        )
246        .unwrap_err();
247        assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
248    }
249
250    #[test]
251    fn test_ensure_route_present_partition_expr_mismatch() {
252        let source_region = RegionDescriptor {
253            region_id: RegionId::new(1024, 1),
254            partition_expr: range_expr("x", 0, 100),
255        };
256        let target_region = RegionDescriptor {
257            region_id: RegionId::new(1024, 2),
258            partition_expr: range_expr("x", 0, 10),
259        };
260        let region_routes = vec![RegionRoute {
261            region: Region {
262                id: RegionId::new(1024, 1),
263                partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
264                ..Default::default()
265            },
266            leader_peer: Some(Peer::empty(1)),
267            ..Default::default()
268        }];
269        let err = RepartitionStart::ensure_route_present(
270            Uuid::new_v4(),
271            &region_routes,
272            &[source_region],
273            &[target_region],
274        )
275        .unwrap_err();
276        assert_matches!(err, Error::PartitionExprMismatch { .. });
277    }
278
279    #[test]
280    fn test_ensure_route_present_missing_target_region() {
281        let source_region = RegionDescriptor {
282            region_id: RegionId::new(1024, 1),
283            partition_expr: range_expr("x", 0, 100),
284        };
285        let target_region = RegionDescriptor {
286            region_id: RegionId::new(1024, 2),
287            partition_expr: range_expr("x", 0, 10),
288        };
289        let region_routes = vec![RegionRoute {
290            region: Region {
291                id: RegionId::new(1024, 1),
292                partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
293                ..Default::default()
294            },
295            leader_peer: Some(Peer::empty(1)),
296            ..Default::default()
297        }];
298        let err = RepartitionStart::ensure_route_present(
299            Uuid::new_v4(),
300            &region_routes,
301            &[source_region],
302            &[target_region],
303        )
304        .unwrap_err();
305        assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
306    }
307}