meta_srv/procedure/region_migration/
migration_start.rs1use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, Result};
26use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
27use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
28use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
29use crate::procedure::region_migration::update_metadata::UpdateMetadata;
30use crate::procedure::region_migration::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
40pub struct RegionMigrationStart;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for RegionMigrationStart {
45 async fn next(
53 &mut self,
54 ctx: &mut Context,
55 _procedure_ctx: &ProcedureContext,
56 ) -> Result<(Box<dyn State>, Status)> {
57 let region_id = ctx.persistent_ctx.region_id;
58 let region_route = self.retrieve_region_route(ctx, region_id).await?;
59 let to_peer = &ctx.persistent_ctx.to_peer;
60 let from_peer = &ctx.persistent_ctx.from_peer;
61
62 if self.has_migrated(®ion_route, to_peer)? {
63 info!(
64 "Region has been migrated, region: {:?}, to_peer: {:?}",
65 region_route.region.id, to_peer
66 );
67 Ok((Box::new(RegionMigrationEnd), Status::done()))
68 } else if self.invalid_leader_peer(®ion_route, from_peer)? {
69 info!(
70 "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
71 region_route.region.id, region_route.leader_peer, from_peer,
72 );
73 Ok((
74 Box::new(RegionMigrationAbort::new(&format!(
75 "Invalid region leader peer: {from_peer:?}, expected: {:?}",
76 region_route.leader_peer.as_ref().unwrap(),
77 ))),
78 Status::done(),
79 ))
80 } else if self.check_candidate_region_on_peer(®ion_route, to_peer) {
81 Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
82 } else {
83 Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
84 }
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90}
91
92impl RegionMigrationStart {
93 async fn retrieve_region_route(
102 &self,
103 ctx: &mut Context,
104 region_id: RegionId,
105 ) -> Result<RegionRoute> {
106 let table_id = region_id.table_id();
107 let table_route = ctx.get_table_route_value().await?;
108
109 let region_route = table_route
110 .region_routes()
111 .context(error::UnexpectedLogicalRouteTableSnafu {
112 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113 })?
114 .iter()
115 .find(|route| route.region.id == region_id)
116 .cloned()
117 .context(error::UnexpectedSnafu {
118 violated: format!(
119 "RegionRoute({}) is not found in TableRoute({})",
120 region_id, table_id
121 ),
122 })?;
123
124 Ok(region_route)
125 }
126
127 fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
130 region_route
131 .follower_peers
132 .iter()
133 .any(|peer| peer.id == to_peer.id)
134 }
135
136 fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
141 let region_id = region_route.region.id;
142
143 let is_invalid_leader_peer = region_route
144 .leader_peer
145 .as_ref()
146 .context(error::UnexpectedSnafu {
147 violated: format!("Leader peer is not found in TableRoute({})", region_id),
148 })?
149 .id
150 != from_peer.id;
151 Ok(is_invalid_leader_peer)
152 }
153
154 fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
160 let region_id = region_route.region.id;
161
162 let region_migrated = region_route
163 .leader_peer
164 .as_ref()
165 .context(error::UnexpectedSnafu {
166 violated: format!("Leader peer is not found in TableRoute({})", region_id),
167 })?
168 .id
169 == to_peer.id;
170 Ok(region_migrated)
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::assert_matches::assert_matches;
177
178 use common_meta::key::test_utils::new_test_table_info;
179 use common_meta::peer::Peer;
180 use common_meta::rpc::router::{Region, RegionRoute};
181 use store_api::storage::RegionId;
182
183 use super::*;
184 use crate::error::Error;
185 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
186 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
187 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
188
189 fn new_persistent_context() -> PersistentContext {
190 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
191 }
192
193 #[tokio::test]
194 async fn test_table_route_is_not_found_error() {
195 let state = RegionMigrationStart;
196 let env = TestingEnv::new();
197 let persistent_context = new_persistent_context();
198 let mut ctx = env.context_factory().new_context(persistent_context);
199
200 let err = state
201 .retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
202 .await
203 .unwrap_err();
204
205 assert_matches!(err, Error::TableRouteNotFound { .. });
206
207 assert!(!err.is_retryable());
208 }
209
210 #[tokio::test]
211 async fn test_region_route_is_not_found_error() {
212 let state = RegionMigrationStart;
213 let persistent_context = new_persistent_context();
214 let from_peer = persistent_context.from_peer.clone();
215
216 let env = TestingEnv::new();
217 let mut ctx = env.context_factory().new_context(persistent_context);
218
219 let table_info = new_test_table_info(1024, vec![1]).into();
220 let region_route = RegionRoute {
221 region: Region::new_test(RegionId::new(1024, 1)),
222 leader_peer: Some(from_peer.clone()),
223 ..Default::default()
224 };
225
226 env.create_physical_table_metadata(table_info, vec![region_route])
227 .await;
228
229 let err = state
230 .retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
231 .await
232 .unwrap_err();
233
234 assert_matches!(err, Error::Unexpected { .. });
235 assert!(!err.is_retryable());
236 }
237
238 #[tokio::test]
239 async fn test_next_update_metadata_downgrade_state() {
240 let mut state = Box::new(RegionMigrationStart);
241 let persistent_context = new_persistent_context();
244 let from_peer_id = persistent_context.from_peer.id;
245 let to_peer = persistent_context.to_peer.clone();
246 let region_id = persistent_context.region_id;
247
248 let env = TestingEnv::new();
249 let mut ctx = env.context_factory().new_context(persistent_context);
250
251 let table_info = new_test_table_info(1024, vec![1]).into();
252 let region_routes = vec![RegionRoute {
253 region: Region::new_test(region_id),
254 leader_peer: Some(Peer::empty(from_peer_id)),
255 follower_peers: vec![to_peer],
256 ..Default::default()
257 }];
258
259 env.create_physical_table_metadata(table_info, region_routes)
260 .await;
261 let procedure_ctx = new_procedure_context();
262 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
263
264 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
265
266 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
267 }
268
269 #[tokio::test]
270 async fn test_next_migration_end_state() {
271 let mut state = Box::new(RegionMigrationStart);
272 let persistent_context = new_persistent_context();
275 let to_peer = persistent_context.to_peer.clone();
276 let from_peer = persistent_context.from_peer.clone();
277 let region_id = persistent_context.region_id;
278
279 let env = TestingEnv::new();
280 let mut ctx = env.context_factory().new_context(persistent_context);
281
282 let table_info = new_test_table_info(1024, vec![1]).into();
283 let region_routes = vec![RegionRoute {
284 region: Region::new_test(region_id),
285 leader_peer: Some(to_peer),
286 follower_peers: vec![from_peer],
287 ..Default::default()
288 }];
289
290 env.create_physical_table_metadata(table_info, region_routes)
291 .await;
292 let procedure_ctx = new_procedure_context();
293 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
294
295 let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
296 }
297
298 #[tokio::test]
299 async fn test_next_open_candidate_region_state() {
300 let mut state = Box::new(RegionMigrationStart);
301 let persistent_context = new_persistent_context();
304 let from_peer_id = persistent_context.from_peer.id;
305 let region_id = persistent_context.region_id;
306 let env = TestingEnv::new();
307 let mut ctx = env.context_factory().new_context(persistent_context);
308
309 let table_info = new_test_table_info(1024, vec![1]).into();
310 let region_routes = vec![RegionRoute {
311 region: Region::new_test(region_id),
312 leader_peer: Some(Peer::empty(from_peer_id)),
313 ..Default::default()
314 }];
315
316 env.create_physical_table_metadata(table_info, region_routes)
317 .await;
318 let procedure_ctx = new_procedure_context();
319 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
320
321 let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
322 }
323
324 #[tokio::test]
325 async fn test_next_migration_abort() {
326 let mut state = Box::new(RegionMigrationStart);
327 let persistent_context = new_persistent_context();
330 let region_id = persistent_context.region_id;
331 let env = TestingEnv::new();
332 let mut ctx = env.context_factory().new_context(persistent_context);
333
334 let table_info = new_test_table_info(1024, vec![1]).into();
335 let region_routes = vec![RegionRoute {
336 region: Region::new_test(region_id),
337 leader_peer: Some(Peer::empty(1024)),
338 ..Default::default()
339 }];
340
341 env.create_physical_table_metadata(table_info, region_routes)
342 .await;
343 let procedure_ctx = new_procedure_context();
344 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
345
346 let _ = next
347 .as_any()
348 .downcast_ref::<RegionMigrationAbort>()
349 .unwrap();
350 }
351}