meta_srv/procedure/repartition/group/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::{
26    Context, GroupId, GroupPrepareResult, State, region_routes,
27};
28use crate::procedure::repartition::plan::RegionDescriptor;
29
30#[derive(Debug, Serialize, Deserialize)]
31pub struct RepartitionStart;
32
33/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
34fn ensure_region_route_expr_match(
35    region_route: &RegionRoute,
36    region_descriptor: &RegionDescriptor,
37) -> Result<RegionRoute> {
38    let actual = &region_route.region.partition_expr;
39    let expected = region_descriptor
40        .partition_expr
41        .as_json_str()
42        .context(error::SerializePartitionExprSnafu)?;
43    ensure!(
44        actual == &expected,
45        error::PartitionExprMismatchSnafu {
46            region_id: region_route.region.id,
47            expected,
48            actual,
49        }
50    );
51    Ok(region_route.clone())
52}
53
54impl RepartitionStart {
55    /// Ensures that both source and target regions are present in the region routes.
56    ///
57    /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
58    #[allow(dead_code)]
59    fn ensure_route_present(
60        group_id: GroupId,
61        region_routes: &[RegionRoute],
62        sources: &[RegionDescriptor],
63        targets: &[RegionDescriptor],
64    ) -> Result<GroupPrepareResult> {
65        ensure!(
66            !sources.is_empty(),
67            error::UnexpectedSnafu {
68                violated: "Sources are empty"
69            }
70        );
71
72        let region_routes_map = region_routes
73            .iter()
74            .map(|r| (r.region.id, r))
75            .collect::<HashMap<_, _>>();
76        let source_region_routes = sources
77            .iter()
78            .map(|s| {
79                region_routes_map
80                    .get(&s.region_id)
81                    .context(error::RepartitionSourceRegionMissingSnafu {
82                        group_id,
83                        region_id: s.region_id,
84                    })
85                    .and_then(|r| ensure_region_route_expr_match(r, s))
86            })
87            .collect::<Result<Vec<_>>>()?;
88        let target_region_routes = targets
89            .iter()
90            .map(|t| {
91                region_routes_map
92                    .get(&t.region_id)
93                    .context(error::RepartitionTargetRegionMissingSnafu {
94                        group_id,
95                        region_id: t.region_id,
96                    })
97                    .map(|r| (*r).clone())
98            })
99            .collect::<Result<Vec<_>>>()?;
100        for target_region_route in &target_region_routes {
101            ensure!(
102                target_region_route.leader_peer.is_some(),
103                error::UnexpectedSnafu {
104                    violated: format!(
105                        "Leader peer is not set for region: {}",
106                        target_region_route.region.id
107                    ),
108                }
109            );
110        }
111        let central_region = sources[0].region_id;
112        let central_region_datanode_id = source_region_routes[0]
113            .leader_peer
114            .as_ref()
115            .context(error::UnexpectedSnafu {
116                violated: format!(
117                    "Leader peer is not set for central region: {}",
118                    central_region
119                ),
120            })?
121            .id;
122
123        Ok(GroupPrepareResult {
124            source_routes: source_region_routes,
125            target_routes: target_region_routes,
126            central_region,
127            central_region_datanode_id,
128        })
129    }
130
131    #[allow(dead_code)]
132    fn next_state() -> (Box<dyn State>, Status) {
133        // TODO(weny): change it later.
134        (Box::new(RepartitionStart), Status::executing(true))
135    }
136}
137
138#[async_trait::async_trait]
139#[typetag::serde]
140impl State for RepartitionStart {
141    /// Captures the group prepare result.
142    ///
143    /// Retry:
144    /// - Failed to get the table route.
145    ///
146    /// Abort
147    /// - Table route not found.
148    /// - Table route is not physical.
149    /// - Failed to ensure the route is present.
150    /// - Failed to capture the group prepare result.
151    async fn next(
152        &mut self,
153        ctx: &mut Context,
154        _procedure_ctx: &ProcedureContext,
155    ) -> Result<(Box<dyn State>, Status)> {
156        if ctx.persistent_ctx.group_prepare_result.is_some() {
157            return Ok(Self::next_state());
158        }
159        let table_id = ctx.persistent_ctx.table_id;
160        let group_id = ctx.persistent_ctx.group_id;
161        let table_route_value = ctx.get_table_route_value().await?.into_inner();
162        let region_routes = region_routes(table_id, &table_route_value)?;
163        let group_prepare_result = Self::ensure_route_present(
164            group_id,
165            region_routes,
166            &ctx.persistent_ctx.sources,
167            &ctx.persistent_ctx.targets,
168        )?;
169        ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
170        debug!(
171            "Repartition group {}: captured {} sources, {} targets",
172            group_id,
173            ctx.persistent_ctx.sources.len(),
174            ctx.persistent_ctx.targets.len()
175        );
176
177        Ok(Self::next_state())
178    }
179
180    fn as_any(&self) -> &dyn Any {
181        self
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use std::assert_matches::assert_matches;
188
189    use common_meta::peer::Peer;
190    use common_meta::rpc::router::{Region, RegionRoute};
191    use store_api::storage::RegionId;
192    use uuid::Uuid;
193
194    use crate::error::Error;
195    use crate::procedure::repartition::group::repartition_start::RepartitionStart;
196    use crate::procedure::repartition::plan::RegionDescriptor;
197    use crate::procedure::repartition::test_util::range_expr;
198
199    #[test]
200    fn test_ensure_route_present_missing_source_region() {
201        let source_region = RegionDescriptor {
202            region_id: RegionId::new(1024, 1),
203            partition_expr: range_expr("x", 0, 100),
204        };
205        let target_region = RegionDescriptor {
206            region_id: RegionId::new(1024, 2),
207            partition_expr: range_expr("x", 0, 10),
208        };
209        let region_routes = vec![RegionRoute {
210            region: Region {
211                id: RegionId::new(1024, 2),
212                ..Default::default()
213            },
214            leader_peer: Some(Peer::empty(1)),
215            ..Default::default()
216        }];
217        let err = RepartitionStart::ensure_route_present(
218            Uuid::new_v4(),
219            &region_routes,
220            &[source_region],
221            &[target_region],
222        )
223        .unwrap_err();
224        assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
225    }
226
227    #[test]
228    fn test_ensure_route_present_partition_expr_mismatch() {
229        let source_region = RegionDescriptor {
230            region_id: RegionId::new(1024, 1),
231            partition_expr: range_expr("x", 0, 100),
232        };
233        let target_region = RegionDescriptor {
234            region_id: RegionId::new(1024, 2),
235            partition_expr: range_expr("x", 0, 10),
236        };
237        let region_routes = vec![RegionRoute {
238            region: Region {
239                id: RegionId::new(1024, 1),
240                partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
241                ..Default::default()
242            },
243            leader_peer: Some(Peer::empty(1)),
244            ..Default::default()
245        }];
246        let err = RepartitionStart::ensure_route_present(
247            Uuid::new_v4(),
248            &region_routes,
249            &[source_region],
250            &[target_region],
251        )
252        .unwrap_err();
253        assert_matches!(err, Error::PartitionExprMismatch { .. });
254    }
255
256    #[test]
257    fn test_ensure_route_present_missing_target_region() {
258        let source_region = RegionDescriptor {
259            region_id: RegionId::new(1024, 1),
260            partition_expr: range_expr("x", 0, 100),
261        };
262        let target_region = RegionDescriptor {
263            region_id: RegionId::new(1024, 2),
264            partition_expr: range_expr("x", 0, 10),
265        };
266        let region_routes = vec![RegionRoute {
267            region: Region {
268                id: RegionId::new(1024, 1),
269                partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
270                ..Default::default()
271            },
272            leader_peer: Some(Peer::empty(1)),
273            ..Default::default()
274        }];
275        let err = RepartitionStart::ensure_route_present(
276            Uuid::new_v4(),
277            &region_routes,
278            &[source_region],
279            &[target_region],
280        )
281        .unwrap_err();
282        assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
283    }
284}