meta_srv/procedure/repartition/group/
repartition_start.rs1use std::any::Any;
16use std::collections::HashMap;
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::{
26 Context, GroupId, GroupPrepareResult, State, region_routes,
27};
28use crate::procedure::repartition::plan::RegionDescriptor;
29
30#[derive(Debug, Serialize, Deserialize)]
31pub struct RepartitionStart;
32
33fn ensure_region_route_expr_match(
35 region_route: &RegionRoute,
36 region_descriptor: &RegionDescriptor,
37) -> Result<RegionRoute> {
38 let actual = ®ion_route.region.partition_expr;
39 let expected = region_descriptor
40 .partition_expr
41 .as_json_str()
42 .context(error::SerializePartitionExprSnafu)?;
43 ensure!(
44 actual == &expected,
45 error::PartitionExprMismatchSnafu {
46 region_id: region_route.region.id,
47 expected,
48 actual,
49 }
50 );
51 Ok(region_route.clone())
52}
53
54impl RepartitionStart {
55 #[allow(dead_code)]
59 fn ensure_route_present(
60 group_id: GroupId,
61 region_routes: &[RegionRoute],
62 sources: &[RegionDescriptor],
63 targets: &[RegionDescriptor],
64 ) -> Result<GroupPrepareResult> {
65 ensure!(
66 !sources.is_empty(),
67 error::UnexpectedSnafu {
68 violated: "Sources are empty"
69 }
70 );
71
72 let region_routes_map = region_routes
73 .iter()
74 .map(|r| (r.region.id, r))
75 .collect::<HashMap<_, _>>();
76 let source_region_routes = sources
77 .iter()
78 .map(|s| {
79 region_routes_map
80 .get(&s.region_id)
81 .context(error::RepartitionSourceRegionMissingSnafu {
82 group_id,
83 region_id: s.region_id,
84 })
85 .and_then(|r| ensure_region_route_expr_match(r, s))
86 })
87 .collect::<Result<Vec<_>>>()?;
88 let target_region_routes = targets
89 .iter()
90 .map(|t| {
91 region_routes_map
92 .get(&t.region_id)
93 .context(error::RepartitionTargetRegionMissingSnafu {
94 group_id,
95 region_id: t.region_id,
96 })
97 .map(|r| (*r).clone())
98 })
99 .collect::<Result<Vec<_>>>()?;
100 for target_region_route in &target_region_routes {
101 ensure!(
102 target_region_route.leader_peer.is_some(),
103 error::UnexpectedSnafu {
104 violated: format!(
105 "Leader peer is not set for region: {}",
106 target_region_route.region.id
107 ),
108 }
109 );
110 }
111 let central_region = sources[0].region_id;
112 let central_region_datanode_id = source_region_routes[0]
113 .leader_peer
114 .as_ref()
115 .context(error::UnexpectedSnafu {
116 violated: format!(
117 "Leader peer is not set for central region: {}",
118 central_region
119 ),
120 })?
121 .id;
122
123 Ok(GroupPrepareResult {
124 source_routes: source_region_routes,
125 target_routes: target_region_routes,
126 central_region,
127 central_region_datanode_id,
128 })
129 }
130
131 #[allow(dead_code)]
132 fn next_state() -> (Box<dyn State>, Status) {
133 (Box::new(RepartitionStart), Status::executing(true))
135 }
136}
137
138#[async_trait::async_trait]
139#[typetag::serde]
140impl State for RepartitionStart {
141 async fn next(
152 &mut self,
153 ctx: &mut Context,
154 _procedure_ctx: &ProcedureContext,
155 ) -> Result<(Box<dyn State>, Status)> {
156 if ctx.persistent_ctx.group_prepare_result.is_some() {
157 return Ok(Self::next_state());
158 }
159 let table_id = ctx.persistent_ctx.table_id;
160 let group_id = ctx.persistent_ctx.group_id;
161 let table_route_value = ctx.get_table_route_value().await?.into_inner();
162 let region_routes = region_routes(table_id, &table_route_value)?;
163 let group_prepare_result = Self::ensure_route_present(
164 group_id,
165 region_routes,
166 &ctx.persistent_ctx.sources,
167 &ctx.persistent_ctx.targets,
168 )?;
169 ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
170 debug!(
171 "Repartition group {}: captured {} sources, {} targets",
172 group_id,
173 ctx.persistent_ctx.sources.len(),
174 ctx.persistent_ctx.targets.len()
175 );
176
177 Ok(Self::next_state())
178 }
179
180 fn as_any(&self) -> &dyn Any {
181 self
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use std::assert_matches::assert_matches;
188
189 use common_meta::peer::Peer;
190 use common_meta::rpc::router::{Region, RegionRoute};
191 use store_api::storage::RegionId;
192 use uuid::Uuid;
193
194 use crate::error::Error;
195 use crate::procedure::repartition::group::repartition_start::RepartitionStart;
196 use crate::procedure::repartition::plan::RegionDescriptor;
197 use crate::procedure::repartition::test_util::range_expr;
198
199 #[test]
200 fn test_ensure_route_present_missing_source_region() {
201 let source_region = RegionDescriptor {
202 region_id: RegionId::new(1024, 1),
203 partition_expr: range_expr("x", 0, 100),
204 };
205 let target_region = RegionDescriptor {
206 region_id: RegionId::new(1024, 2),
207 partition_expr: range_expr("x", 0, 10),
208 };
209 let region_routes = vec![RegionRoute {
210 region: Region {
211 id: RegionId::new(1024, 2),
212 ..Default::default()
213 },
214 leader_peer: Some(Peer::empty(1)),
215 ..Default::default()
216 }];
217 let err = RepartitionStart::ensure_route_present(
218 Uuid::new_v4(),
219 ®ion_routes,
220 &[source_region],
221 &[target_region],
222 )
223 .unwrap_err();
224 assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
225 }
226
227 #[test]
228 fn test_ensure_route_present_partition_expr_mismatch() {
229 let source_region = RegionDescriptor {
230 region_id: RegionId::new(1024, 1),
231 partition_expr: range_expr("x", 0, 100),
232 };
233 let target_region = RegionDescriptor {
234 region_id: RegionId::new(1024, 2),
235 partition_expr: range_expr("x", 0, 10),
236 };
237 let region_routes = vec![RegionRoute {
238 region: Region {
239 id: RegionId::new(1024, 1),
240 partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
241 ..Default::default()
242 },
243 leader_peer: Some(Peer::empty(1)),
244 ..Default::default()
245 }];
246 let err = RepartitionStart::ensure_route_present(
247 Uuid::new_v4(),
248 ®ion_routes,
249 &[source_region],
250 &[target_region],
251 )
252 .unwrap_err();
253 assert_matches!(err, Error::PartitionExprMismatch { .. });
254 }
255
256 #[test]
257 fn test_ensure_route_present_missing_target_region() {
258 let source_region = RegionDescriptor {
259 region_id: RegionId::new(1024, 1),
260 partition_expr: range_expr("x", 0, 100),
261 };
262 let target_region = RegionDescriptor {
263 region_id: RegionId::new(1024, 2),
264 partition_expr: range_expr("x", 0, 10),
265 };
266 let region_routes = vec![RegionRoute {
267 region: Region {
268 id: RegionId::new(1024, 1),
269 partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
270 ..Default::default()
271 },
272 leader_peer: Some(Peer::empty(1)),
273 ..Default::default()
274 }];
275 let err = RepartitionStart::ensure_route_present(
276 Uuid::new_v4(),
277 ®ion_routes,
278 &[source_region],
279 &[target_region],
280 )
281 .unwrap_err();
282 assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
283 }
284}