meta_srv/procedure/repartition/group/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::{
26    Context, GroupId, GroupPrepareResult, State, region_routes,
27};
28use crate::procedure::repartition::plan::RegionDescriptor;
29
30#[derive(Debug, Serialize, Deserialize)]
31pub struct RepartitionStart;
32
33/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
34fn ensure_region_route_expr_match(
35    region_route: &RegionRoute,
36    region_descriptor: &RegionDescriptor,
37) -> Result<RegionRoute> {
38    let actual = &region_route.region.partition_expr;
39    let expected = region_descriptor
40        .partition_expr
41        .as_json_str()
42        .context(error::SerializePartitionExprSnafu)?;
43    ensure!(
44        actual == &expected,
45        error::PartitionExprMismatchSnafu {
46            region_id: region_route.region.id,
47            expected,
48            actual,
49        }
50    );
51    Ok(region_route.clone())
52}
53
54impl RepartitionStart {
55    /// Ensures that both source and target regions are present in the region routes.
56    ///
57    /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
58    #[allow(dead_code)]
59    fn ensure_route_present(
60        group_id: GroupId,
61        region_routes: &[RegionRoute],
62        sources: &[RegionDescriptor],
63        targets: &[RegionDescriptor],
64    ) -> Result<GroupPrepareResult> {
65        ensure!(
66            !sources.is_empty(),
67            error::UnexpectedSnafu {
68                violated: "Sources are empty"
69            }
70        );
71
72        let region_routes_map = region_routes
73            .iter()
74            .map(|r| (r.region.id, r))
75            .collect::<HashMap<_, _>>();
76        let source_region_routes = sources
77            .iter()
78            .map(|s| {
79                region_routes_map
80                    .get(&s.region_id)
81                    .context(error::RepartitionSourceRegionMissingSnafu {
82                        group_id,
83                        region_id: s.region_id,
84                    })
85                    .and_then(|r| ensure_region_route_expr_match(r, s))
86            })
87            .collect::<Result<Vec<_>>>()?;
88        let target_region_routes = targets
89            .iter()
90            .map(|t| {
91                region_routes_map
92                    .get(&t.region_id)
93                    .context(error::RepartitionTargetRegionMissingSnafu {
94                        group_id,
95                        region_id: t.region_id,
96                    })
97                    .map(|r| (*r).clone())
98            })
99            .collect::<Result<Vec<_>>>()?;
100        let central_region = sources[0].region_id;
101        let central_region_datanode_id = source_region_routes[0]
102            .leader_peer
103            .as_ref()
104            .context(error::UnexpectedSnafu {
105                violated: format!(
106                    "Leader peer is not set for central region: {}",
107                    central_region
108                ),
109            })?
110            .id;
111
112        Ok(GroupPrepareResult {
113            source_routes: source_region_routes,
114            target_routes: target_region_routes,
115            central_region,
116            central_region_datanode_id,
117        })
118    }
119
120    #[allow(dead_code)]
121    fn next_state() -> (Box<dyn State>, Status) {
122        // TODO(weny): change it later.
123        (Box::new(RepartitionStart), Status::executing(true))
124    }
125}
126
127#[async_trait::async_trait]
128#[typetag::serde]
129impl State for RepartitionStart {
130    /// Captures the group prepare result.
131    ///
132    /// Retry:
133    /// - Failed to get the table route.
134    ///
135    /// Abort
136    /// - Table route not found.
137    /// - Table route is not physical.
138    /// - Failed to ensure the route is present.
139    /// - Failed to capture the group prepare result.
140    async fn next(
141        &mut self,
142        ctx: &mut Context,
143        _procedure_ctx: &ProcedureContext,
144    ) -> Result<(Box<dyn State>, Status)> {
145        if ctx.persistent_ctx.group_prepare_result.is_some() {
146            return Ok(Self::next_state());
147        }
148        let table_id = ctx.persistent_ctx.table_id;
149        let group_id = ctx.persistent_ctx.group_id;
150        let table_route_value = ctx.get_table_route_value().await?.into_inner();
151        let region_routes = region_routes(table_id, &table_route_value)?;
152        let group_prepare_result = Self::ensure_route_present(
153            group_id,
154            region_routes,
155            &ctx.persistent_ctx.sources,
156            &ctx.persistent_ctx.targets,
157        )?;
158        ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
159        debug!(
160            "Repartition group {}: captured {} sources, {} targets",
161            group_id,
162            ctx.persistent_ctx.sources.len(),
163            ctx.persistent_ctx.targets.len()
164        );
165
166        Ok(Self::next_state())
167    }
168
169    fn as_any(&self) -> &dyn Any {
170        self
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::assert_matches::assert_matches;
177
178    use common_meta::peer::Peer;
179    use common_meta::rpc::router::{Region, RegionRoute};
180    use store_api::storage::RegionId;
181    use uuid::Uuid;
182
183    use crate::error::Error;
184    use crate::procedure::repartition::group::repartition_start::RepartitionStart;
185    use crate::procedure::repartition::plan::RegionDescriptor;
186    use crate::procedure::repartition::test_util::range_expr;
187
188    #[test]
189    fn test_ensure_route_present_missing_source_region() {
190        let source_region = RegionDescriptor {
191            region_id: RegionId::new(1024, 1),
192            partition_expr: range_expr("x", 0, 100),
193        };
194        let target_region = RegionDescriptor {
195            region_id: RegionId::new(1024, 2),
196            partition_expr: range_expr("x", 0, 10),
197        };
198        let region_routes = vec![RegionRoute {
199            region: Region {
200                id: RegionId::new(1024, 2),
201                ..Default::default()
202            },
203            leader_peer: Some(Peer::empty(1)),
204            ..Default::default()
205        }];
206        let err = RepartitionStart::ensure_route_present(
207            Uuid::new_v4(),
208            &region_routes,
209            &[source_region],
210            &[target_region],
211        )
212        .unwrap_err();
213        assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
214    }
215
216    #[test]
217    fn test_ensure_route_present_partition_expr_mismatch() {
218        let source_region = RegionDescriptor {
219            region_id: RegionId::new(1024, 1),
220            partition_expr: range_expr("x", 0, 100),
221        };
222        let target_region = RegionDescriptor {
223            region_id: RegionId::new(1024, 2),
224            partition_expr: range_expr("x", 0, 10),
225        };
226        let region_routes = vec![RegionRoute {
227            region: Region {
228                id: RegionId::new(1024, 1),
229                partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
230                ..Default::default()
231            },
232            leader_peer: Some(Peer::empty(1)),
233            ..Default::default()
234        }];
235        let err = RepartitionStart::ensure_route_present(
236            Uuid::new_v4(),
237            &region_routes,
238            &[source_region],
239            &[target_region],
240        )
241        .unwrap_err();
242        assert_matches!(err, Error::PartitionExprMismatch { .. });
243    }
244
245    #[test]
246    fn test_ensure_route_present_missing_target_region() {
247        let source_region = RegionDescriptor {
248            region_id: RegionId::new(1024, 1),
249            partition_expr: range_expr("x", 0, 100),
250        };
251        let target_region = RegionDescriptor {
252            region_id: RegionId::new(1024, 2),
253            partition_expr: range_expr("x", 0, 10),
254        };
255        let region_routes = vec![RegionRoute {
256            region: Region {
257                id: RegionId::new(1024, 1),
258                partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
259                ..Default::default()
260            },
261            leader_peer: Some(Peer::empty(1)),
262            ..Default::default()
263        }];
264        let err = RepartitionStart::ensure_route_present(
265            Uuid::new_v4(),
266            &region_routes,
267            &[source_region],
268            &[target_region],
269        )
270        .unwrap_err();
271        assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
272    }
273}