meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::rpc::router::{region_distribution, RegionRoute};
18use common_telemetry::{info, warn};
19use snafu::{ensure, OptionExt, ResultExt};
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::update_metadata::UpdateMetadata;
23use crate::procedure::region_migration::Context;
24
25impl UpdateMetadata {
26 async fn build_upgrade_candidate_region_metadata(
28 &self,
29 ctx: &mut Context,
30 ) -> Result<Vec<RegionRoute>> {
31 let region_id = ctx.region_id();
32 let table_route_value = ctx.get_table_route_value().await?.clone();
33
34 let mut region_routes = table_route_value
35 .region_routes()
36 .context(error::UnexpectedLogicalRouteTableSnafu {
37 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
38 })?
39 .clone();
40 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == region_id)
43 .context(error::RegionRouteNotFoundSnafu { region_id })?;
44
45 region_route.set_leader_state(None);
47
48 let candidate = &ctx.persistent_ctx.to_peer;
49 let expected_old_leader = &ctx.persistent_ctx.from_peer;
50
51 ensure!(region_route
53 .leader_peer
54 .take_if(|old_leader| old_leader.id == expected_old_leader.id)
55 .is_some(),
56 error::UnexpectedSnafu{
57 violated: format!("Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}", region_route.leader_peer, expected_old_leader),
58 }
59 );
60
61 region_route.leader_peer = Some(candidate.clone());
62 info!(
63 "Upgrading candidate region to leader region: {:?} for region: {}",
64 candidate, region_id
65 );
66
67 let removed = region_route
69 .follower_peers
70 .extract_if(.., |peer| peer.id == candidate.id)
71 .collect::<Vec<_>>();
72
73 if removed.len() > 1 {
74 warn!(
75 "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}"
76 );
77 }
78
79 Ok(region_routes)
80 }
81
82 async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
84 let region_id = ctx.region_id();
85 let table_route_value = ctx.get_table_route_value().await?.clone();
86
87 let region_routes = table_route_value
88 .region_routes()
89 .context(error::UnexpectedLogicalRouteTableSnafu {
90 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
91 })?
92 .clone();
93 let region_route = region_routes
94 .into_iter()
95 .find(|route| route.region.id == region_id)
96 .context(error::RegionRouteNotFoundSnafu { region_id })?;
97
98 let leader_peer = region_route
99 .leader_peer
100 .as_ref()
101 .context(error::UnexpectedSnafu {
102 violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
103 })?;
104
105 let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
106
107 if leader_peer.id == candidate_peer_id {
108 ensure!(
109 !region_route.is_leader_downgrading(),
110 error::UnexpectedSnafu {
111 violated: format!("Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"),
112 }
113 );
114
115 Ok(true)
116 } else {
117 Ok(false)
118 }
119 }
120
121 pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> {
131 let region_id = ctx.region_id();
132 let table_metadata_manager = ctx.table_metadata_manager.clone();
133
134 if self.check_metadata_updated(ctx).await? {
135 return Ok(());
136 }
137
138 let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
139 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
140 let RegionInfo {
141 region_storage_path,
142 region_options,
143 region_wal_options,
144 engine,
145 } = datanode_table_value.region_info.clone();
146 let table_route_value = ctx.get_table_route_value().await?;
147
148 let region_distribution = region_distribution(®ion_routes);
149 info!(
150 "Trying to update region routes to {:?} for table: {}",
151 region_distribution,
152 region_id.table_id()
153 );
154 if let Err(err) = table_metadata_manager
155 .update_table_route(
156 region_id.table_id(),
157 RegionInfo {
158 engine: engine.to_string(),
159 region_storage_path: region_storage_path.to_string(),
160 region_options: region_options.clone(),
161 region_wal_options: region_wal_options.clone(),
162 },
163 table_route_value,
164 region_routes,
165 ®ion_options,
166 ®ion_wal_options,
167 )
168 .await
169 .context(error::TableMetadataManagerSnafu)
170 {
171 ctx.remove_table_route_value();
172 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
173 reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
174 });
175 };
176
177 ctx.remove_table_route_value();
178 ctx.deregister_failure_detectors().await;
179 ctx.volatile_ctx.opening_region_guard.take();
181
182 Ok(())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use std::assert_matches::assert_matches;
189
190 use common_meta::key::test_utils::new_test_table_info;
191 use common_meta::peer::Peer;
192 use common_meta::region_keeper::MemoryRegionKeeper;
193 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
194 use common_time::util::current_time_millis;
195 use store_api::storage::RegionId;
196
197 use crate::error::Error;
198 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
199 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
200 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
201 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
202
203 fn new_persistent_context() -> PersistentContext {
204 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
205 }
206
207 #[tokio::test]
208 async fn test_table_route_is_not_found_error() {
209 let state = UpdateMetadata::Upgrade;
210
211 let env = TestingEnv::new();
212 let persistent_context = new_persistent_context();
213 let mut ctx = env.context_factory().new_context(persistent_context);
214
215 let err = state
216 .build_upgrade_candidate_region_metadata(&mut ctx)
217 .await
218 .unwrap_err();
219
220 assert_matches!(err, Error::TableRouteNotFound { .. });
221 assert!(!err.is_retryable());
222 }
223
224 #[tokio::test]
225 async fn test_region_route_is_not_found() {
226 let state = UpdateMetadata::Upgrade;
227 let env = TestingEnv::new();
228 let persistent_context = new_persistent_context();
229 let mut ctx = env.context_factory().new_context(persistent_context);
230
231 let table_info = new_test_table_info(1024, vec![2]).into();
232 let region_routes = vec![RegionRoute {
233 region: Region::new_test(RegionId::new(1024, 2)),
234 leader_peer: Some(Peer::empty(4)),
235 ..Default::default()
236 }];
237
238 env.create_physical_table_metadata(table_info, region_routes)
239 .await;
240
241 let err = state
242 .build_upgrade_candidate_region_metadata(&mut ctx)
243 .await
244 .unwrap_err();
245
246 assert_matches!(err, Error::RegionRouteNotFound { .. });
247 assert!(!err.is_retryable());
248 }
249
250 #[tokio::test]
251 async fn test_region_route_expected_leader() {
252 let state = UpdateMetadata::Upgrade;
253 let env = TestingEnv::new();
254 let persistent_context = new_persistent_context();
255 let mut ctx = env.context_factory().new_context(persistent_context);
256
257 let table_info = new_test_table_info(1024, vec![1]).into();
258 let region_routes = vec![RegionRoute {
259 region: Region::new_test(RegionId::new(1024, 1)),
260 leader_peer: Some(Peer::empty(3)),
261 ..Default::default()
262 }];
263
264 env.create_physical_table_metadata(table_info, region_routes)
265 .await;
266
267 let err = state
268 .build_upgrade_candidate_region_metadata(&mut ctx)
269 .await
270 .unwrap_err();
271
272 assert_matches!(err, Error::Unexpected { .. });
273 assert!(!err.is_retryable());
274 assert!(err.to_string().contains("Unexpected region leader"));
275 }
276
277 #[tokio::test]
278 async fn test_build_upgrade_candidate_region_metadata() {
279 let state = UpdateMetadata::Upgrade;
280 let env = TestingEnv::new();
281 let persistent_context = new_persistent_context();
282 let mut ctx = env.context_factory().new_context(persistent_context);
283
284 let table_info = new_test_table_info(1024, vec![1]).into();
285 let region_routes = vec![RegionRoute {
286 region: Region::new_test(RegionId::new(1024, 1)),
287 leader_peer: Some(Peer::empty(1)),
288 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
289 leader_state: Some(LeaderState::Downgrading),
290 leader_down_since: Some(current_time_millis()),
291 }];
292
293 env.create_physical_table_metadata(table_info, region_routes)
294 .await;
295
296 let new_region_routes = state
297 .build_upgrade_candidate_region_metadata(&mut ctx)
298 .await
299 .unwrap();
300
301 assert!(!new_region_routes[0].is_leader_downgrading());
302 assert!(new_region_routes[0].leader_down_since.is_none());
303 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
304 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
305 }
306
307 #[tokio::test]
308 async fn test_failed_to_update_table_route_error() {
309 let state = UpdateMetadata::Upgrade;
310 let env = TestingEnv::new();
311 let persistent_context = new_persistent_context();
312 let mut ctx = env.context_factory().new_context(persistent_context);
313 let opening_keeper = MemoryRegionKeeper::default();
314
315 let table_id = 1024;
316 let table_info = new_test_table_info(table_id, vec![1]).into();
317 let region_routes = vec![
318 RegionRoute {
319 region: Region::new_test(RegionId::new(table_id, 1)),
320 leader_peer: Some(Peer::empty(1)),
321 follower_peers: vec![Peer::empty(5), Peer::empty(3)],
322 leader_state: Some(LeaderState::Downgrading),
323 leader_down_since: Some(current_time_millis()),
324 },
325 RegionRoute {
326 region: Region::new_test(RegionId::new(table_id, 2)),
327 leader_peer: Some(Peer::empty(4)),
328 leader_state: Some(LeaderState::Downgrading),
329 ..Default::default()
330 },
331 ];
332
333 env.create_physical_table_metadata(table_info, region_routes)
334 .await;
335
336 let table_metadata_manager = env.table_metadata_manager();
337 let original_table_route = table_metadata_manager
338 .table_route_manager()
339 .table_route_storage()
340 .get_with_raw_bytes(table_id)
341 .await
342 .unwrap()
343 .unwrap();
344
345 table_metadata_manager
347 .update_leader_region_status(table_id, &original_table_route, |route| {
348 if route.region.id == RegionId::new(1024, 2) {
349 Some(None)
351 } else {
352 None
353 }
354 })
355 .await
356 .unwrap();
357
358 ctx.volatile_ctx.table_route = Some(original_table_route);
360 let guard = opening_keeper
361 .register(2, RegionId::new(table_id, 1))
362 .unwrap();
363 ctx.volatile_ctx.opening_region_guard = Some(guard);
364 let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
365
366 assert!(ctx.volatile_ctx.table_route.is_none());
367 assert!(ctx.volatile_ctx.opening_region_guard.is_some());
368 assert!(err.is_retryable());
369 assert!(format!("{err:?}").contains("Failed to update the table route"));
370 }
371
372 #[tokio::test]
373 async fn test_check_metadata() {
374 let state = UpdateMetadata::Upgrade;
375 let env = TestingEnv::new();
376 let persistent_context = new_persistent_context();
377 let leader_peer = persistent_context.from_peer.clone();
378
379 let mut ctx = env.context_factory().new_context(persistent_context);
380 let table_info = new_test_table_info(1024, vec![1]).into();
381 let region_routes = vec![RegionRoute {
382 region: Region::new_test(RegionId::new(1024, 1)),
383 leader_peer: Some(leader_peer),
384 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
385 leader_state: None,
386 leader_down_since: None,
387 }];
388
389 env.create_physical_table_metadata(table_info, region_routes)
390 .await;
391
392 let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
393 assert!(!updated);
394 }
395
396 #[tokio::test]
397 async fn test_check_metadata_updated() {
398 let state = UpdateMetadata::Upgrade;
399 let env = TestingEnv::new();
400 let persistent_context = new_persistent_context();
401 let candidate_peer = persistent_context.to_peer.clone();
402
403 let mut ctx = env.context_factory().new_context(persistent_context);
404 let table_info = new_test_table_info(1024, vec![1]).into();
405 let region_routes = vec![RegionRoute {
406 region: Region::new_test(RegionId::new(1024, 1)),
407 leader_peer: Some(candidate_peer),
408 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
409 leader_state: None,
410 leader_down_since: None,
411 }];
412
413 env.create_physical_table_metadata(table_info, region_routes)
414 .await;
415
416 let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
417 assert!(updated);
418 }
419
420 #[tokio::test]
421 async fn test_check_metadata_intermediate_state() {
422 let state = UpdateMetadata::Upgrade;
423 let env = TestingEnv::new();
424 let persistent_context = new_persistent_context();
425 let candidate_peer = persistent_context.to_peer.clone();
426
427 let mut ctx = env.context_factory().new_context(persistent_context);
428 let table_info = new_test_table_info(1024, vec![1]).into();
429 let region_routes = vec![RegionRoute {
430 region: Region::new_test(RegionId::new(1024, 1)),
431 leader_peer: Some(candidate_peer),
432 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
433 leader_state: Some(LeaderState::Downgrading),
434 leader_down_since: None,
435 }];
436
437 env.create_physical_table_metadata(table_info, region_routes)
438 .await;
439
440 let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
441 assert_matches!(err, Error::Unexpected { .. });
442 assert!(err.to_string().contains("intermediate state"));
443 }
444
445 #[tokio::test]
446 async fn test_next_close_downgraded_region_state() {
447 let mut state = Box::new(UpdateMetadata::Upgrade);
448 let env = TestingEnv::new();
449 let persistent_context = new_persistent_context();
450 let mut ctx = env.context_factory().new_context(persistent_context);
451 let opening_keeper = MemoryRegionKeeper::default();
452
453 let table_id = 1024;
454 let table_info = new_test_table_info(table_id, vec![1]).into();
455 let region_routes = vec![RegionRoute {
456 region: Region::new_test(RegionId::new(table_id, 1)),
457 leader_peer: Some(Peer::empty(1)),
458 leader_state: Some(LeaderState::Downgrading),
459 ..Default::default()
460 }];
461
462 let guard = opening_keeper
463 .register(2, RegionId::new(table_id, 1))
464 .unwrap();
465 ctx.volatile_ctx.opening_region_guard = Some(guard);
466
467 env.create_physical_table_metadata(table_info, region_routes)
468 .await;
469
470 let table_metadata_manager = env.table_metadata_manager();
471
472 let procedure_ctx = new_procedure_context();
473 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
474
475 let _ = next
476 .as_any()
477 .downcast_ref::<CloseDowngradedRegion>()
478 .unwrap();
479
480 let table_route = table_metadata_manager
481 .table_route_manager()
482 .table_route_storage()
483 .get(table_id)
484 .await
485 .unwrap()
486 .unwrap();
487 let region_routes = table_route.region_routes().unwrap();
488
489 assert!(ctx.volatile_ctx.table_route.is_none());
490 assert!(ctx.volatile_ctx.opening_region_guard.is_none());
491 assert_eq!(region_routes.len(), 1);
492 assert!(!region_routes[0].is_leader_downgrading());
493 assert!(region_routes[0].follower_peers.is_empty());
494 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
495 }
496}