meta_srv/procedure/region_migration/
migration_start.rs1use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
26use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
27use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
28use crate::procedure::region_migration::{Context, State};
29
30#[derive(Debug, Serialize, Deserialize)]
36pub struct RegionMigrationStart;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for RegionMigrationStart {
41 async fn next(
49 &mut self,
50 ctx: &mut Context,
51 _procedure_ctx: &ProcedureContext,
52 ) -> Result<(Box<dyn State>, Status)> {
53 let mut region_routes = self.retrieve_region_routes(ctx).await?;
54 let to_peer = &ctx.persistent_ctx.to_peer;
55 let from_peer = &ctx.persistent_ctx.from_peer;
56 let region_ids = &ctx.persistent_ctx.region_ids;
57
58 self.filter_unmigrated_regions(&mut region_routes, to_peer);
59
60 if region_routes.is_empty() {
62 info!(
63 "All regions have been migrated, regions: {:?}, to_peer: {:?}",
64 region_ids, to_peer
65 );
66 return Ok((Box::new(RegionMigrationEnd), Status::done()));
67 }
68
69 if region_routes.len() != region_ids.len() {
71 let unmigrated_region_ids = region_routes.iter().map(|route| route.region.id).collect();
72 info!(
73 "Some of the regions have been migrated, only migrate the following regions: {:?}, to_peer: {:?}",
74 unmigrated_region_ids, to_peer
75 );
76 ctx.persistent_ctx.region_ids = unmigrated_region_ids;
77 }
78
79 for region_route in ®ion_routes {
81 if self.invalid_leader_peer(region_route, from_peer)? {
82 info!(
83 "Abort region migration, region:{}, unexpected leader peer: {:?}, expected: {:?}",
84 region_route.region.id, region_route.leader_peer, from_peer,
85 );
86 return Ok((
87 Box::new(RegionMigrationAbort::new(&format!(
88 "Invalid region leader peer: {:?}, expected: {:?}",
89 region_route.leader_peer.as_ref().unwrap(),
90 from_peer,
91 ))),
92 Status::done(),
93 ));
94 }
95 }
96
97 Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
99 }
100
101 fn as_any(&self) -> &dyn Any {
102 self
103 }
104}
105
106impl RegionMigrationStart {
107 async fn retrieve_region_routes(&self, ctx: &mut Context) -> Result<Vec<RegionRoute>> {
116 let region_ids = &ctx.persistent_ctx.region_ids;
117 let table_route_values = ctx.get_table_route_values().await?;
118 let mut region_routes = Vec::with_capacity(region_ids.len());
119 for region_id in region_ids {
120 let table_id = region_id.table_id();
121 let region_route = table_route_values
122 .get(&table_id)
123 .context(error::TableRouteNotFoundSnafu { table_id })?
124 .region_routes()
125 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
126 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
127 })?
128 .iter()
129 .find(|route| route.region.id == *region_id)
130 .cloned()
131 .with_context(|| error::UnexpectedSnafu {
132 violated: format!(
133 "RegionRoute({}) is not found in TableRoute({})",
134 region_id, table_id
135 ),
136 })?;
137 region_routes.push(region_route);
138 }
139
140 Ok(region_routes)
141 }
142
143 fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
148 let region_id = region_route.region.id;
149
150 let is_invalid_leader_peer = region_route
151 .leader_peer
152 .as_ref()
153 .with_context(|| error::UnexpectedSnafu {
154 violated: format!("Leader peer is not found in TableRoute({})", region_id),
155 })?
156 .id
157 != from_peer.id;
158 Ok(is_invalid_leader_peer)
159 }
160
161 fn filter_unmigrated_regions(&self, region_routes: &mut Vec<RegionRoute>, to_peer: &Peer) {
163 region_routes
164 .retain(|region_route| !self.has_migrated(region_route, to_peer).unwrap_or(false));
165 }
166
167 fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
173 let region_id = region_route.region.id;
174
175 let region_migrated = region_route
176 .leader_peer
177 .as_ref()
178 .with_context(|| error::UnexpectedSnafu {
179 violated: format!("Leader peer is not found in TableRoute({})", region_id),
180 })?
181 .id
182 == to_peer.id;
183 Ok(region_migrated)
184 }
185}
186
187#[cfg(test)]
188mod tests {
189
190 use std::assert_matches::assert_matches;
191
192 use common_meta::key::test_utils::new_test_table_info;
193 use common_meta::peer::Peer;
194 use common_meta::rpc::router::{Region, RegionRoute};
195 use store_api::storage::RegionId;
196
197 use super::*;
198 use crate::error::Error;
199 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
200 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
201
202 fn new_persistent_context() -> PersistentContext {
203 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
204 }
205
206 #[tokio::test]
207 async fn test_table_route_is_not_found_error() {
208 let state = RegionMigrationStart;
209 let env = TestingEnv::new();
210 let persistent_context = new_persistent_context();
211 let mut ctx = env.context_factory().new_context(persistent_context);
212 let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
213 assert_matches!(err, Error::TableRouteNotFound { .. });
214 assert!(!err.is_retryable());
215 }
216
217 #[tokio::test]
218 async fn test_region_route_is_not_found_error() {
219 let state = RegionMigrationStart;
220 let persistent_context = new_persistent_context();
221 let from_peer = persistent_context.from_peer.clone();
222
223 let env = TestingEnv::new();
224 let mut ctx = env.context_factory().new_context(persistent_context);
225
226 let table_info = new_test_table_info(1024, vec![3]).into();
227 let region_route = RegionRoute {
228 region: Region::new_test(RegionId::new(1024, 3)),
229 leader_peer: Some(from_peer.clone()),
230 ..Default::default()
231 };
232
233 env.create_physical_table_metadata(table_info, vec![region_route])
234 .await;
235 let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
236 assert_matches!(err, Error::Unexpected { .. });
237 assert!(!err.is_retryable());
238 }
239
240 #[tokio::test]
241 async fn test_next_migration_end_state() {
242 let mut state = Box::new(RegionMigrationStart);
243 let persistent_context = new_persistent_context();
246 let to_peer = persistent_context.to_peer.clone();
247 let from_peer = persistent_context.from_peer.clone();
248 let region_id = persistent_context.region_ids[0];
249
250 let env = TestingEnv::new();
251 let mut ctx = env.context_factory().new_context(persistent_context);
252
253 let table_info = new_test_table_info(1024, vec![1]).into();
254 let region_routes = vec![RegionRoute {
255 region: Region::new_test(region_id),
256 leader_peer: Some(to_peer),
257 follower_peers: vec![from_peer],
258 ..Default::default()
259 }];
260
261 env.create_physical_table_metadata(table_info, region_routes)
262 .await;
263 let procedure_ctx = new_procedure_context();
264 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
265
266 let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
267 }
268
269 #[tokio::test]
270 async fn test_next_open_candidate_region_state() {
271 let mut state = Box::new(RegionMigrationStart);
272 let persistent_context = new_persistent_context();
275 let from_peer_id = persistent_context.from_peer.id;
276 let region_id = persistent_context.region_ids[0];
277 let env = TestingEnv::new();
278 let mut ctx = env.context_factory().new_context(persistent_context);
279
280 let table_info = new_test_table_info(1024, vec![1]).into();
281 let region_routes = vec![RegionRoute {
282 region: Region::new_test(region_id),
283 leader_peer: Some(Peer::empty(from_peer_id)),
284 ..Default::default()
285 }];
286
287 env.create_physical_table_metadata(table_info, region_routes)
288 .await;
289 let procedure_ctx = new_procedure_context();
290 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
291
292 let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
293 }
294
295 #[tokio::test]
296 async fn test_next_migration_abort() {
297 let mut state = Box::new(RegionMigrationStart);
298 let persistent_context = new_persistent_context();
301 let region_id = persistent_context.region_ids[0];
302 let env = TestingEnv::new();
303 let mut ctx = env.context_factory().new_context(persistent_context);
304
305 let table_info = new_test_table_info(1024, vec![1]).into();
306 let region_routes: Vec<RegionRoute> = vec![RegionRoute {
307 region: Region::new_test(region_id),
308 leader_peer: Some(Peer::empty(1024)),
309 ..Default::default()
310 }];
311
312 env.create_physical_table_metadata(table_info, region_routes)
313 .await;
314 let procedure_ctx = new_procedure_context();
315 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
316
317 let _ = next
318 .as_any()
319 .downcast_ref::<RegionMigrationAbort>()
320 .unwrap();
321 }
322}