meta_srv/procedure/repartition/group/
repartition_start.rs1use std::any::Any;
16use std::collections::HashMap;
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::{
26 Context, GroupId, GroupPrepareResult, State, region_routes,
27};
28use crate::procedure::repartition::plan::RegionDescriptor;
29
30#[derive(Debug, Serialize, Deserialize)]
31pub struct RepartitionStart;
32
33fn ensure_region_route_expr_match(
35 region_route: &RegionRoute,
36 region_descriptor: &RegionDescriptor,
37) -> Result<RegionRoute> {
38 let actual = ®ion_route.region.partition_expr;
39 let expected = region_descriptor
40 .partition_expr
41 .as_json_str()
42 .context(error::SerializePartitionExprSnafu)?;
43 ensure!(
44 actual == &expected,
45 error::PartitionExprMismatchSnafu {
46 region_id: region_route.region.id,
47 expected,
48 actual,
49 }
50 );
51 Ok(region_route.clone())
52}
53
54impl RepartitionStart {
55 #[allow(dead_code)]
59 fn ensure_route_present(
60 group_id: GroupId,
61 region_routes: &[RegionRoute],
62 sources: &[RegionDescriptor],
63 targets: &[RegionDescriptor],
64 ) -> Result<GroupPrepareResult> {
65 ensure!(
66 !sources.is_empty(),
67 error::UnexpectedSnafu {
68 violated: "Sources are empty"
69 }
70 );
71
72 let region_routes_map = region_routes
73 .iter()
74 .map(|r| (r.region.id, r))
75 .collect::<HashMap<_, _>>();
76 let source_region_routes = sources
77 .iter()
78 .map(|s| {
79 region_routes_map
80 .get(&s.region_id)
81 .context(error::RepartitionSourceRegionMissingSnafu {
82 group_id,
83 region_id: s.region_id,
84 })
85 .and_then(|r| ensure_region_route_expr_match(r, s))
86 })
87 .collect::<Result<Vec<_>>>()?;
88 let target_region_routes = targets
89 .iter()
90 .map(|t| {
91 region_routes_map
92 .get(&t.region_id)
93 .context(error::RepartitionTargetRegionMissingSnafu {
94 group_id,
95 region_id: t.region_id,
96 })
97 .map(|r| (*r).clone())
98 })
99 .collect::<Result<Vec<_>>>()?;
100 let central_region = sources[0].region_id;
101 let central_region_datanode_id = source_region_routes[0]
102 .leader_peer
103 .as_ref()
104 .context(error::UnexpectedSnafu {
105 violated: format!(
106 "Leader peer is not set for central region: {}",
107 central_region
108 ),
109 })?
110 .id;
111
112 Ok(GroupPrepareResult {
113 source_routes: source_region_routes,
114 target_routes: target_region_routes,
115 central_region,
116 central_region_datanode_id,
117 })
118 }
119
120 #[allow(dead_code)]
121 fn next_state() -> (Box<dyn State>, Status) {
122 (Box::new(RepartitionStart), Status::executing(true))
124 }
125}
126
127#[async_trait::async_trait]
128#[typetag::serde]
129impl State for RepartitionStart {
130 async fn next(
141 &mut self,
142 ctx: &mut Context,
143 _procedure_ctx: &ProcedureContext,
144 ) -> Result<(Box<dyn State>, Status)> {
145 if ctx.persistent_ctx.group_prepare_result.is_some() {
146 return Ok(Self::next_state());
147 }
148 let table_id = ctx.persistent_ctx.table_id;
149 let group_id = ctx.persistent_ctx.group_id;
150 let table_route_value = ctx.get_table_route_value().await?.into_inner();
151 let region_routes = region_routes(table_id, &table_route_value)?;
152 let group_prepare_result = Self::ensure_route_present(
153 group_id,
154 region_routes,
155 &ctx.persistent_ctx.sources,
156 &ctx.persistent_ctx.targets,
157 )?;
158 ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
159 debug!(
160 "Repartition group {}: captured {} sources, {} targets",
161 group_id,
162 ctx.persistent_ctx.sources.len(),
163 ctx.persistent_ctx.targets.len()
164 );
165
166 Ok(Self::next_state())
167 }
168
169 fn as_any(&self) -> &dyn Any {
170 self
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::assert_matches::assert_matches;
177
178 use common_meta::peer::Peer;
179 use common_meta::rpc::router::{Region, RegionRoute};
180 use store_api::storage::RegionId;
181 use uuid::Uuid;
182
183 use crate::error::Error;
184 use crate::procedure::repartition::group::repartition_start::RepartitionStart;
185 use crate::procedure::repartition::plan::RegionDescriptor;
186 use crate::procedure::repartition::test_util::range_expr;
187
188 #[test]
189 fn test_ensure_route_present_missing_source_region() {
190 let source_region = RegionDescriptor {
191 region_id: RegionId::new(1024, 1),
192 partition_expr: range_expr("x", 0, 100),
193 };
194 let target_region = RegionDescriptor {
195 region_id: RegionId::new(1024, 2),
196 partition_expr: range_expr("x", 0, 10),
197 };
198 let region_routes = vec![RegionRoute {
199 region: Region {
200 id: RegionId::new(1024, 2),
201 ..Default::default()
202 },
203 leader_peer: Some(Peer::empty(1)),
204 ..Default::default()
205 }];
206 let err = RepartitionStart::ensure_route_present(
207 Uuid::new_v4(),
208 ®ion_routes,
209 &[source_region],
210 &[target_region],
211 )
212 .unwrap_err();
213 assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
214 }
215
216 #[test]
217 fn test_ensure_route_present_partition_expr_mismatch() {
218 let source_region = RegionDescriptor {
219 region_id: RegionId::new(1024, 1),
220 partition_expr: range_expr("x", 0, 100),
221 };
222 let target_region = RegionDescriptor {
223 region_id: RegionId::new(1024, 2),
224 partition_expr: range_expr("x", 0, 10),
225 };
226 let region_routes = vec![RegionRoute {
227 region: Region {
228 id: RegionId::new(1024, 1),
229 partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
230 ..Default::default()
231 },
232 leader_peer: Some(Peer::empty(1)),
233 ..Default::default()
234 }];
235 let err = RepartitionStart::ensure_route_present(
236 Uuid::new_v4(),
237 ®ion_routes,
238 &[source_region],
239 &[target_region],
240 )
241 .unwrap_err();
242 assert_matches!(err, Error::PartitionExprMismatch { .. });
243 }
244
245 #[test]
246 fn test_ensure_route_present_missing_target_region() {
247 let source_region = RegionDescriptor {
248 region_id: RegionId::new(1024, 1),
249 partition_expr: range_expr("x", 0, 100),
250 };
251 let target_region = RegionDescriptor {
252 region_id: RegionId::new(1024, 2),
253 partition_expr: range_expr("x", 0, 10),
254 };
255 let region_routes = vec![RegionRoute {
256 region: Region {
257 id: RegionId::new(1024, 1),
258 partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
259 ..Default::default()
260 },
261 leader_peer: Some(Peer::empty(1)),
262 ..Default::default()
263 }];
264 let err = RepartitionStart::ensure_route_present(
265 Uuid::new_v4(),
266 ®ion_routes,
267 &[source_region],
268 &[target_region],
269 )
270 .unwrap_err();
271 assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
272 }
273}