meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29 fn build_upgrade_candidate_region_metadata(
31 &self,
32 ctx: &mut Context,
33 region_ids: &[RegionId],
34 mut region_routes: Vec<RegionRoute>,
35 ) -> Result<Vec<RegionRoute>> {
36 let old_leader_peer = &ctx.persistent_ctx.from_peer;
37 let new_leader_peer = &ctx.persistent_ctx.to_peer;
38 for region_id in region_ids {
39 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == *region_id)
43 .context(error::RegionRouteNotFoundSnafu {
44 region_id: *region_id,
45 })?;
46
47 region_route.set_leader_state(None);
49
50 ensure!(
52 region_route
53 .leader_peer
54 .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55 .is_some(),
56 error::UnexpectedSnafu {
57 violated: format!(
58 "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59 region_route.leader_peer, old_leader_peer
60 ),
61 }
62 );
63
64 region_route.leader_peer = Some(new_leader_peer.clone());
66
67 let removed = region_route
69 .follower_peers
70 .extract_if(.., |peer| peer.id == new_leader_peer.id)
71 .collect::<Vec<_>>();
72
73 if removed.len() > 1 {
75 warn!(
76 "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77 );
78 }
79 }
80
81 info!(
82 "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83 new_leader_peer, region_ids,
84 );
85
86 Ok(region_routes)
87 }
88
89 fn check_metadata_updated(
98 &self,
99 ctx: &mut Context,
100 region_ids: &[RegionId],
101 region_routes: &[RegionRoute],
102 ) -> Result<bool> {
103 for region_id in region_ids {
105 let region_route = region_routes
107 .iter()
108 .find(|route| route.region.id == *region_id)
109 .context(error::RegionRouteNotFoundSnafu {
110 region_id: *region_id,
111 })?;
112
113 let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115 violated: format!(
116 "The leader peer of region {region_id} is not found during the metadata upgrade check"
117 ),
118 })?;
119
120 if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122 return Ok(false);
123 } else {
124 ensure!(
126 !region_route.is_leader_downgrading(),
127 error::UnexpectedSnafu {
128 violated: format!(
129 "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130 ),
131 }
132 );
133 }
134 }
135
136 Ok(true)
138 }
139
140 pub async fn upgrade_candidate_region(
150 &self,
151 ctx: &mut Context,
152 ctx_provider: &ContextProviderRef,
153 ) -> Result<()> {
154 let table_metadata_manager = ctx.table_metadata_manager.clone();
155 let table_regions = ctx.persistent_ctx.table_regions();
156 let from_peer_id = ctx.persistent_ctx.from_peer.id;
157 let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159 for (table_id, region_ids) in table_regions {
160 let table_lock = TableLock::Write(table_id).into();
161 let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163 let table_route_value = ctx.get_table_route_value(table_id).await?;
164 let region_routes = table_route_value.region_routes().with_context(|_| {
165 error::UnexpectedLogicalRouteTableSnafu {
166 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167 }
168 })?;
169 if self.check_metadata_updated(ctx, ®ion_ids, region_routes)? {
170 continue;
171 }
172 let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173 let RegionInfo {
174 region_storage_path,
175 region_options,
176 region_wal_options,
177 engine,
178 } = datanode_table_value.region_info.clone();
179 let new_region_routes = self.build_upgrade_candidate_region_metadata(
180 ctx,
181 ®ion_ids,
182 region_routes.clone(),
183 )?;
184 let region_distribution = region_distribution(region_routes);
185 info!(
186 "Trying to update region routes to {:?} for table: {}",
187 region_distribution, table_id,
188 );
189
190 if let Err(err) = table_metadata_manager
191 .update_table_route(
192 table_id,
193 RegionInfo {
194 engine: engine.clone(),
195 region_storage_path: region_storage_path.clone(),
196 region_options: region_options.clone(),
197 region_wal_options: region_wal_options.clone(),
198 },
199 &table_route_value,
200 new_region_routes,
201 ®ion_options,
202 ®ion_wal_options,
203 )
204 .await
205 .context(error::TableMetadataManagerSnafu)
206 {
207 error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209 reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210 });
211 };
212 info!(
213 "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214 );
215 }
216
217 ctx.deregister_failure_detectors().await;
218 ctx.reset_failure_detectors_for_candidate_regions().await;
219 ctx.volatile_ctx.opening_region_guards.clear();
221
222 Ok(())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use std::assert_matches;
229
230 use common_meta::key::test_utils::new_test_table_info;
231 use common_meta::peer::Peer;
232 use common_meta::region_keeper::MemoryRegionKeeper;
233 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
234 use common_time::util::current_time_millis;
235 use store_api::region_engine::RegionRole;
236 use store_api::storage::RegionId;
237
238 use crate::error::Error;
239 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
240 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
241 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
242 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
243
244 fn new_persistent_context() -> PersistentContext {
245 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
246 }
247
248 #[tokio::test]
249 async fn test_table_route_is_not_found_error() {
250 let env = TestingEnv::new();
251 let persistent_context = new_persistent_context();
252 let ctx = env.context_factory().new_context(persistent_context);
253
254 let err = ctx.get_table_route_value(1024).await.unwrap_err();
255
256 assert_matches!(err, Error::TableRouteNotFound { .. });
257 assert!(!err.is_retryable());
258 }
259
260 #[tokio::test]
261 async fn test_region_route_is_not_found() {
262 let state = UpdateMetadata::Upgrade;
263 let env = TestingEnv::new();
264 let persistent_context = new_persistent_context();
265 let mut ctx = env.context_factory().new_context(persistent_context);
266
267 let table_info = new_test_table_info(1024);
268 let region_routes = vec![RegionRoute {
269 region: Region::new_test(RegionId::new(1024, 2)),
270 leader_peer: Some(Peer::empty(4)),
271 ..Default::default()
272 }];
273 env.create_physical_table_metadata(table_info, region_routes)
274 .await;
275
276 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
277 let region_routes = table_route_value
278 .into_inner()
279 .into_physical_table_route()
280 .region_routes;
281 let err = state
282 .build_upgrade_candidate_region_metadata(
283 &mut ctx,
284 &[RegionId::new(1024, 1)],
285 region_routes,
286 )
287 .unwrap_err();
288
289 assert_matches!(err, Error::RegionRouteNotFound { .. });
290 assert!(!err.is_retryable());
291 }
292
293 #[tokio::test]
294 async fn test_region_route_expected_leader() {
295 let state = UpdateMetadata::Upgrade;
296 let env = TestingEnv::new();
297 let persistent_context = new_persistent_context();
298 let mut ctx = env.context_factory().new_context(persistent_context);
299
300 let table_info = new_test_table_info(1024);
301 let region_routes = vec![RegionRoute {
302 region: Region::new_test(RegionId::new(1024, 1)),
303 leader_peer: Some(Peer::empty(3)),
304 ..Default::default()
305 }];
306
307 env.create_physical_table_metadata(table_info, region_routes)
308 .await;
309
310 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
311 let region_routes = table_route_value
312 .into_inner()
313 .into_physical_table_route()
314 .region_routes;
315 let err = state
316 .build_upgrade_candidate_region_metadata(
317 &mut ctx,
318 &[RegionId::new(1024, 1)],
319 region_routes,
320 )
321 .unwrap_err();
322
323 assert_matches!(err, Error::Unexpected { .. });
324 assert!(!err.is_retryable());
325 assert!(err.to_string().contains("Unexpected region leader"));
326 }
327
328 #[tokio::test]
329 async fn test_build_upgrade_candidate_region_metadata() {
330 let state = UpdateMetadata::Upgrade;
331 let env = TestingEnv::new();
332 let persistent_context = new_persistent_context();
333 let mut ctx = env.context_factory().new_context(persistent_context);
334
335 let table_info = new_test_table_info(1024);
336 let region_routes = vec![RegionRoute {
337 region: Region::new_test(RegionId::new(1024, 1)),
338 leader_peer: Some(Peer::empty(1)),
339 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
340 leader_state: Some(LeaderState::Downgrading),
341 leader_down_since: Some(current_time_millis()),
342 write_route_policy: None,
343 }];
344
345 env.create_physical_table_metadata(table_info, region_routes)
346 .await;
347
348 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
349 let region_routes = table_route_value
350 .into_inner()
351 .into_physical_table_route()
352 .region_routes;
353 let new_region_routes = state
354 .build_upgrade_candidate_region_metadata(
355 &mut ctx,
356 &[RegionId::new(1024, 1)],
357 region_routes,
358 )
359 .unwrap();
360
361 assert!(!new_region_routes[0].is_leader_downgrading());
362 assert!(new_region_routes[0].leader_down_since.is_none());
363 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
364 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
365 }
366
367 #[tokio::test]
368 async fn test_check_metadata() {
369 let state = UpdateMetadata::Upgrade;
370 let env = TestingEnv::new();
371 let persistent_context = new_persistent_context();
372 let leader_peer = persistent_context.from_peer.clone();
373
374 let mut ctx = env.context_factory().new_context(persistent_context);
375 let table_info = new_test_table_info(1024);
376 let region_routes = vec![RegionRoute {
377 region: Region::new_test(RegionId::new(1024, 1)),
378 leader_peer: Some(leader_peer),
379 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
380 leader_state: None,
381 leader_down_since: None,
382 write_route_policy: None,
383 }];
384
385 env.create_physical_table_metadata(table_info, region_routes)
386 .await;
387 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
388 let region_routes = table_routes.region_routes().unwrap();
389 let updated = state
390 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
391 .unwrap();
392 assert!(!updated);
393 }
394
395 #[tokio::test]
396 async fn test_check_metadata_updated() {
397 let state = UpdateMetadata::Upgrade;
398 let env = TestingEnv::new();
399 let persistent_context = new_persistent_context();
400 let candidate_peer = persistent_context.to_peer.clone();
401
402 let mut ctx = env.context_factory().new_context(persistent_context);
403 let table_info = new_test_table_info(1024);
404 let region_routes = vec![RegionRoute {
405 region: Region::new_test(RegionId::new(1024, 1)),
406 leader_peer: Some(candidate_peer),
407 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
408 leader_state: None,
409 leader_down_since: None,
410 write_route_policy: None,
411 }];
412
413 env.create_physical_table_metadata(table_info, region_routes)
414 .await;
415
416 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
417 let region_routes = table_routes.region_routes().unwrap();
418 let updated = state
419 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
420 .unwrap();
421 assert!(updated);
422 }
423
424 #[tokio::test]
425 async fn test_check_metadata_intermediate_state() {
426 let state = UpdateMetadata::Upgrade;
427 let env = TestingEnv::new();
428 let persistent_context = new_persistent_context();
429 let candidate_peer = persistent_context.to_peer.clone();
430
431 let mut ctx = env.context_factory().new_context(persistent_context);
432 let table_info = new_test_table_info(1024);
433 let region_routes = vec![RegionRoute {
434 region: Region::new_test(RegionId::new(1024, 1)),
435 leader_peer: Some(candidate_peer),
436 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
437 leader_state: Some(LeaderState::Downgrading),
438 leader_down_since: None,
439 write_route_policy: None,
440 }];
441
442 env.create_physical_table_metadata(table_info, region_routes)
443 .await;
444
445 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
446 let region_routes = table_routes.region_routes().unwrap();
447 let err = state
448 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
449 .unwrap_err();
450 assert_matches!(err, Error::Unexpected { .. });
451 assert!(err.to_string().contains("intermediate state"));
452 }
453
454 #[tokio::test]
455 async fn test_next_close_downgraded_region_state() {
456 let mut state = Box::new(UpdateMetadata::Upgrade);
457 let env = TestingEnv::new();
458 let persistent_context = new_persistent_context();
459 let mut ctx = env.context_factory().new_context(persistent_context);
460 let opening_keeper = MemoryRegionKeeper::default();
461
462 let table_id = 1024;
463 let table_info = new_test_table_info(table_id);
464 let region_routes = vec![RegionRoute {
465 region: Region::new_test(RegionId::new(table_id, 1)),
466 leader_peer: Some(Peer::empty(1)),
467 leader_state: Some(LeaderState::Downgrading),
468 ..Default::default()
469 }];
470
471 let guard = opening_keeper
472 .register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower)
473 .unwrap();
474 ctx.volatile_ctx.opening_region_guards.push(guard);
475
476 env.create_physical_table_metadata(table_info, region_routes)
477 .await;
478
479 let table_metadata_manager = env.table_metadata_manager();
480
481 let procedure_ctx = new_procedure_context();
482 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
483
484 let _ = next
485 .as_any()
486 .downcast_ref::<CloseDowngradedRegion>()
487 .unwrap();
488
489 let table_route = table_metadata_manager
490 .table_route_manager()
491 .table_route_storage()
492 .get(table_id)
493 .await
494 .unwrap()
495 .unwrap();
496 let region_routes = table_route.region_routes().unwrap();
497
498 assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
499 assert_eq!(region_routes.len(), 1);
500 assert!(!region_routes[0].is_leader_downgrading());
501 assert!(region_routes[0].follower_peers.is_empty());
502 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
503 }
504}