meta_srv/procedure/region_migration/
migration_start.rs1use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, Result};
26use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
27use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
28use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
29use crate::procedure::region_migration::update_metadata::UpdateMetadata;
30use crate::procedure::region_migration::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
40pub struct RegionMigrationStart;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for RegionMigrationStart {
45 async fn next(
53 &mut self,
54 ctx: &mut Context,
55 _procedure_ctx: &ProcedureContext,
56 ) -> Result<(Box<dyn State>, Status)> {
57 let region_id = ctx.persistent_ctx.region_id;
58 let region_route = self.retrieve_region_route(ctx, region_id).await?;
59 let to_peer = &ctx.persistent_ctx.to_peer;
60 let from_peer = &ctx.persistent_ctx.from_peer;
61
62 if self.has_migrated(®ion_route, to_peer)? {
63 info!(
64 "Region has been migrated, region: {:?}, to_peer: {:?}",
65 region_route.region.id, to_peer
66 );
67 Ok((Box::new(RegionMigrationEnd), Status::done()))
68 } else if self.invalid_leader_peer(®ion_route, from_peer)? {
69 info!(
70 "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
71 region_route.region.id, region_route.leader_peer, from_peer,
72 );
73 Ok((
74 Box::new(RegionMigrationAbort::new(&format!(
75 "Invalid region leader peer: {from_peer:?}, expected: {:?}",
76 region_route.leader_peer.as_ref().unwrap(),
77 ))),
78 Status::done(),
79 ))
80 } else if self.check_candidate_region_on_peer(®ion_route, to_peer) {
81 Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
82 } else {
83 Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
84 }
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90}
91
92impl RegionMigrationStart {
93 async fn retrieve_region_route(
102 &self,
103 ctx: &mut Context,
104 region_id: RegionId,
105 ) -> Result<RegionRoute> {
106 let table_id = region_id.table_id();
107 let table_route = ctx.get_table_route_value().await?;
108
109 let region_route = table_route
110 .region_routes()
111 .context(error::UnexpectedLogicalRouteTableSnafu {
112 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113 })?
114 .iter()
115 .find(|route| route.region.id == region_id)
116 .cloned()
117 .context(error::UnexpectedSnafu {
118 violated: format!(
119 "RegionRoute({}) is not found in TableRoute({})",
120 region_id, table_id
121 ),
122 })?;
123
124 Ok(region_route)
125 }
126
127 fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
130 let region_opened = region_route
131 .follower_peers
132 .iter()
133 .any(|peer| peer.id == to_peer.id);
134
135 region_opened
136 }
137
138 fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
143 let region_id = region_route.region.id;
144
145 let is_invalid_leader_peer = region_route
146 .leader_peer
147 .as_ref()
148 .context(error::UnexpectedSnafu {
149 violated: format!("Leader peer is not found in TableRoute({})", region_id),
150 })?
151 .id
152 != from_peer.id;
153 Ok(is_invalid_leader_peer)
154 }
155
156 fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
162 let region_id = region_route.region.id;
163
164 let region_migrated = region_route
165 .leader_peer
166 .as_ref()
167 .context(error::UnexpectedSnafu {
168 violated: format!("Leader peer is not found in TableRoute({})", region_id),
169 })?
170 .id
171 == to_peer.id;
172 Ok(region_migrated)
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::assert_matches::assert_matches;
179
180 use common_meta::key::test_utils::new_test_table_info;
181 use common_meta::peer::Peer;
182 use common_meta::rpc::router::{Region, RegionRoute};
183 use store_api::storage::RegionId;
184
185 use super::*;
186 use crate::error::Error;
187 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
188 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
189 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
190
191 fn new_persistent_context() -> PersistentContext {
192 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
193 }
194
195 #[tokio::test]
196 async fn test_table_route_is_not_found_error() {
197 let state = RegionMigrationStart;
198 let env = TestingEnv::new();
199 let persistent_context = new_persistent_context();
200 let mut ctx = env.context_factory().new_context(persistent_context);
201
202 let err = state
203 .retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
204 .await
205 .unwrap_err();
206
207 assert_matches!(err, Error::TableRouteNotFound { .. });
208
209 assert!(!err.is_retryable());
210 }
211
212 #[tokio::test]
213 async fn test_region_route_is_not_found_error() {
214 let state = RegionMigrationStart;
215 let persistent_context = new_persistent_context();
216 let from_peer = persistent_context.from_peer.clone();
217
218 let env = TestingEnv::new();
219 let mut ctx = env.context_factory().new_context(persistent_context);
220
221 let table_info = new_test_table_info(1024, vec![1]).into();
222 let region_route = RegionRoute {
223 region: Region::new_test(RegionId::new(1024, 1)),
224 leader_peer: Some(from_peer.clone()),
225 ..Default::default()
226 };
227
228 env.create_physical_table_metadata(table_info, vec![region_route])
229 .await;
230
231 let err = state
232 .retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
233 .await
234 .unwrap_err();
235
236 assert_matches!(err, Error::Unexpected { .. });
237 assert!(!err.is_retryable());
238 }
239
240 #[tokio::test]
241 async fn test_next_update_metadata_downgrade_state() {
242 let mut state = Box::new(RegionMigrationStart);
243 let persistent_context = new_persistent_context();
246 let from_peer_id = persistent_context.from_peer.id;
247 let to_peer = persistent_context.to_peer.clone();
248 let region_id = persistent_context.region_id;
249
250 let env = TestingEnv::new();
251 let mut ctx = env.context_factory().new_context(persistent_context);
252
253 let table_info = new_test_table_info(1024, vec![1]).into();
254 let region_routes = vec![RegionRoute {
255 region: Region::new_test(region_id),
256 leader_peer: Some(Peer::empty(from_peer_id)),
257 follower_peers: vec![to_peer],
258 ..Default::default()
259 }];
260
261 env.create_physical_table_metadata(table_info, region_routes)
262 .await;
263 let procedure_ctx = new_procedure_context();
264 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
265
266 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
267
268 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
269 }
270
271 #[tokio::test]
272 async fn test_next_migration_end_state() {
273 let mut state = Box::new(RegionMigrationStart);
274 let persistent_context = new_persistent_context();
277 let to_peer = persistent_context.to_peer.clone();
278 let from_peer = persistent_context.from_peer.clone();
279 let region_id = persistent_context.region_id;
280
281 let env = TestingEnv::new();
282 let mut ctx = env.context_factory().new_context(persistent_context);
283
284 let table_info = new_test_table_info(1024, vec![1]).into();
285 let region_routes = vec![RegionRoute {
286 region: Region::new_test(region_id),
287 leader_peer: Some(to_peer),
288 follower_peers: vec![from_peer],
289 ..Default::default()
290 }];
291
292 env.create_physical_table_metadata(table_info, region_routes)
293 .await;
294 let procedure_ctx = new_procedure_context();
295 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
296
297 let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
298 }
299
300 #[tokio::test]
301 async fn test_next_open_candidate_region_state() {
302 let mut state = Box::new(RegionMigrationStart);
303 let persistent_context = new_persistent_context();
306 let from_peer_id = persistent_context.from_peer.id;
307 let region_id = persistent_context.region_id;
308 let env = TestingEnv::new();
309 let mut ctx = env.context_factory().new_context(persistent_context);
310
311 let table_info = new_test_table_info(1024, vec![1]).into();
312 let region_routes = vec![RegionRoute {
313 region: Region::new_test(region_id),
314 leader_peer: Some(Peer::empty(from_peer_id)),
315 ..Default::default()
316 }];
317
318 env.create_physical_table_metadata(table_info, region_routes)
319 .await;
320 let procedure_ctx = new_procedure_context();
321 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
322
323 let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
324 }
325
326 #[tokio::test]
327 async fn test_next_migration_abort() {
328 let mut state = Box::new(RegionMigrationStart);
329 let persistent_context = new_persistent_context();
332 let region_id = persistent_context.region_id;
333 let env = TestingEnv::new();
334 let mut ctx = env.context_factory().new_context(persistent_context);
335
336 let table_info = new_test_table_info(1024, vec![1]).into();
337 let region_routes = vec![RegionRoute {
338 region: Region::new_test(region_id),
339 leader_peer: Some(Peer::empty(1024)),
340 ..Default::default()
341 }];
342
343 env.create_physical_table_metadata(table_info, region_routes)
344 .await;
345 let procedure_ctx = new_procedure_context();
346 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
347
348 let _ = next
349 .as_any()
350 .downcast_ref::<RegionMigrationAbort>()
351 .unwrap();
352 }
353}