meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29 fn build_upgrade_candidate_region_metadata(
31 &self,
32 ctx: &mut Context,
33 region_ids: &[RegionId],
34 mut region_routes: Vec<RegionRoute>,
35 ) -> Result<Vec<RegionRoute>> {
36 let old_leader_peer = &ctx.persistent_ctx.from_peer;
37 let new_leader_peer = &ctx.persistent_ctx.to_peer;
38 for region_id in region_ids {
39 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == *region_id)
43 .context(error::RegionRouteNotFoundSnafu {
44 region_id: *region_id,
45 })?;
46
47 region_route.set_leader_state(None);
49
50 ensure!(
52 region_route
53 .leader_peer
54 .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55 .is_some(),
56 error::UnexpectedSnafu {
57 violated: format!(
58 "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59 region_route.leader_peer, old_leader_peer
60 ),
61 }
62 );
63
64 region_route.leader_peer = Some(new_leader_peer.clone());
66
67 let removed = region_route
69 .follower_peers
70 .extract_if(.., |peer| peer.id == new_leader_peer.id)
71 .collect::<Vec<_>>();
72
73 if removed.len() > 1 {
75 warn!(
76 "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77 );
78 }
79 }
80
81 info!(
82 "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83 new_leader_peer, region_ids,
84 );
85
86 Ok(region_routes)
87 }
88
89 fn check_metadata_updated(
98 &self,
99 ctx: &mut Context,
100 region_ids: &[RegionId],
101 region_routes: &[RegionRoute],
102 ) -> Result<bool> {
103 for region_id in region_ids {
105 let region_route = region_routes
107 .iter()
108 .find(|route| route.region.id == *region_id)
109 .context(error::RegionRouteNotFoundSnafu {
110 region_id: *region_id,
111 })?;
112
113 let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115 violated: format!(
116 "The leader peer of region {region_id} is not found during the metadata upgrade check"
117 ),
118 })?;
119
120 if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122 return Ok(false);
123 } else {
124 ensure!(
126 !region_route.is_leader_downgrading(),
127 error::UnexpectedSnafu {
128 violated: format!(
129 "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130 ),
131 }
132 );
133 }
134 }
135
136 Ok(true)
138 }
139
140 pub async fn upgrade_candidate_region(
150 &self,
151 ctx: &mut Context,
152 ctx_provider: &ContextProviderRef,
153 ) -> Result<()> {
154 let table_metadata_manager = ctx.table_metadata_manager.clone();
155 let table_regions = ctx.persistent_ctx.table_regions();
156 let from_peer_id = ctx.persistent_ctx.from_peer.id;
157 let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159 for (table_id, region_ids) in table_regions {
160 let table_lock = TableLock::Write(table_id).into();
161 let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163 let table_route_value = ctx.get_table_route_value(table_id).await?;
164 let region_routes = table_route_value.region_routes().with_context(|_| {
165 error::UnexpectedLogicalRouteTableSnafu {
166 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167 }
168 })?;
169 if self.check_metadata_updated(ctx, ®ion_ids, region_routes)? {
170 continue;
171 }
172 let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173 let RegionInfo {
174 region_storage_path,
175 region_options,
176 region_wal_options,
177 engine,
178 } = datanode_table_value.region_info.clone();
179 let new_region_routes = self.build_upgrade_candidate_region_metadata(
180 ctx,
181 ®ion_ids,
182 region_routes.clone(),
183 )?;
184 let region_distribution = region_distribution(region_routes);
185 info!(
186 "Trying to update region routes to {:?} for table: {}",
187 region_distribution, table_id,
188 );
189
190 if let Err(err) = table_metadata_manager
191 .update_table_route(
192 table_id,
193 RegionInfo {
194 engine: engine.clone(),
195 region_storage_path: region_storage_path.clone(),
196 region_options: region_options.clone(),
197 region_wal_options: region_wal_options.clone(),
198 },
199 &table_route_value,
200 new_region_routes,
201 ®ion_options,
202 ®ion_wal_options,
203 )
204 .await
205 .context(error::TableMetadataManagerSnafu)
206 {
207 error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209 reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210 });
211 };
212 info!(
213 "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214 );
215 }
216
217 ctx.deregister_failure_detectors().await;
218 ctx.volatile_ctx.opening_region_guards.clear();
220
221 Ok(())
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::assert_matches::assert_matches;
228
229 use common_meta::key::test_utils::new_test_table_info;
230 use common_meta::peer::Peer;
231 use common_meta::region_keeper::MemoryRegionKeeper;
232 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
233 use common_time::util::current_time_millis;
234 use store_api::storage::RegionId;
235
236 use crate::error::Error;
237 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
238 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
239 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
240 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
241
242 fn new_persistent_context() -> PersistentContext {
243 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
244 }
245
246 #[tokio::test]
247 async fn test_table_route_is_not_found_error() {
248 let env = TestingEnv::new();
249 let persistent_context = new_persistent_context();
250 let ctx = env.context_factory().new_context(persistent_context);
251
252 let err = ctx.get_table_route_value(1024).await.unwrap_err();
253
254 assert_matches!(err, Error::TableRouteNotFound { .. });
255 assert!(!err.is_retryable());
256 }
257
258 #[tokio::test]
259 async fn test_region_route_is_not_found() {
260 let state = UpdateMetadata::Upgrade;
261 let env = TestingEnv::new();
262 let persistent_context = new_persistent_context();
263 let mut ctx = env.context_factory().new_context(persistent_context);
264
265 let table_info = new_test_table_info(1024);
266 let region_routes = vec![RegionRoute {
267 region: Region::new_test(RegionId::new(1024, 2)),
268 leader_peer: Some(Peer::empty(4)),
269 ..Default::default()
270 }];
271 env.create_physical_table_metadata(table_info, region_routes)
272 .await;
273
274 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
275 let region_routes = table_route_value
276 .into_inner()
277 .into_physical_table_route()
278 .region_routes;
279 let err = state
280 .build_upgrade_candidate_region_metadata(
281 &mut ctx,
282 &[RegionId::new(1024, 1)],
283 region_routes,
284 )
285 .unwrap_err();
286
287 assert_matches!(err, Error::RegionRouteNotFound { .. });
288 assert!(!err.is_retryable());
289 }
290
291 #[tokio::test]
292 async fn test_region_route_expected_leader() {
293 let state = UpdateMetadata::Upgrade;
294 let env = TestingEnv::new();
295 let persistent_context = new_persistent_context();
296 let mut ctx = env.context_factory().new_context(persistent_context);
297
298 let table_info = new_test_table_info(1024);
299 let region_routes = vec![RegionRoute {
300 region: Region::new_test(RegionId::new(1024, 1)),
301 leader_peer: Some(Peer::empty(3)),
302 ..Default::default()
303 }];
304
305 env.create_physical_table_metadata(table_info, region_routes)
306 .await;
307
308 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
309 let region_routes = table_route_value
310 .into_inner()
311 .into_physical_table_route()
312 .region_routes;
313 let err = state
314 .build_upgrade_candidate_region_metadata(
315 &mut ctx,
316 &[RegionId::new(1024, 1)],
317 region_routes,
318 )
319 .unwrap_err();
320
321 assert_matches!(err, Error::Unexpected { .. });
322 assert!(!err.is_retryable());
323 assert!(err.to_string().contains("Unexpected region leader"));
324 }
325
326 #[tokio::test]
327 async fn test_build_upgrade_candidate_region_metadata() {
328 let state = UpdateMetadata::Upgrade;
329 let env = TestingEnv::new();
330 let persistent_context = new_persistent_context();
331 let mut ctx = env.context_factory().new_context(persistent_context);
332
333 let table_info = new_test_table_info(1024);
334 let region_routes = vec![RegionRoute {
335 region: Region::new_test(RegionId::new(1024, 1)),
336 leader_peer: Some(Peer::empty(1)),
337 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
338 leader_state: Some(LeaderState::Downgrading),
339 leader_down_since: Some(current_time_millis()),
340 write_route_policy: None,
341 }];
342
343 env.create_physical_table_metadata(table_info, region_routes)
344 .await;
345
346 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
347 let region_routes = table_route_value
348 .into_inner()
349 .into_physical_table_route()
350 .region_routes;
351 let new_region_routes = state
352 .build_upgrade_candidate_region_metadata(
353 &mut ctx,
354 &[RegionId::new(1024, 1)],
355 region_routes,
356 )
357 .unwrap();
358
359 assert!(!new_region_routes[0].is_leader_downgrading());
360 assert!(new_region_routes[0].leader_down_since.is_none());
361 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
362 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
363 }
364
365 #[tokio::test]
366 async fn test_check_metadata() {
367 let state = UpdateMetadata::Upgrade;
368 let env = TestingEnv::new();
369 let persistent_context = new_persistent_context();
370 let leader_peer = persistent_context.from_peer.clone();
371
372 let mut ctx = env.context_factory().new_context(persistent_context);
373 let table_info = new_test_table_info(1024);
374 let region_routes = vec![RegionRoute {
375 region: Region::new_test(RegionId::new(1024, 1)),
376 leader_peer: Some(leader_peer),
377 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
378 leader_state: None,
379 leader_down_since: None,
380 write_route_policy: None,
381 }];
382
383 env.create_physical_table_metadata(table_info, region_routes)
384 .await;
385 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
386 let region_routes = table_routes.region_routes().unwrap();
387 let updated = state
388 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
389 .unwrap();
390 assert!(!updated);
391 }
392
393 #[tokio::test]
394 async fn test_check_metadata_updated() {
395 let state = UpdateMetadata::Upgrade;
396 let env = TestingEnv::new();
397 let persistent_context = new_persistent_context();
398 let candidate_peer = persistent_context.to_peer.clone();
399
400 let mut ctx = env.context_factory().new_context(persistent_context);
401 let table_info = new_test_table_info(1024);
402 let region_routes = vec![RegionRoute {
403 region: Region::new_test(RegionId::new(1024, 1)),
404 leader_peer: Some(candidate_peer),
405 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
406 leader_state: None,
407 leader_down_since: None,
408 write_route_policy: None,
409 }];
410
411 env.create_physical_table_metadata(table_info, region_routes)
412 .await;
413
414 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
415 let region_routes = table_routes.region_routes().unwrap();
416 let updated = state
417 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
418 .unwrap();
419 assert!(updated);
420 }
421
422 #[tokio::test]
423 async fn test_check_metadata_intermediate_state() {
424 let state = UpdateMetadata::Upgrade;
425 let env = TestingEnv::new();
426 let persistent_context = new_persistent_context();
427 let candidate_peer = persistent_context.to_peer.clone();
428
429 let mut ctx = env.context_factory().new_context(persistent_context);
430 let table_info = new_test_table_info(1024);
431 let region_routes = vec![RegionRoute {
432 region: Region::new_test(RegionId::new(1024, 1)),
433 leader_peer: Some(candidate_peer),
434 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
435 leader_state: Some(LeaderState::Downgrading),
436 leader_down_since: None,
437 write_route_policy: None,
438 }];
439
440 env.create_physical_table_metadata(table_info, region_routes)
441 .await;
442
443 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
444 let region_routes = table_routes.region_routes().unwrap();
445 let err = state
446 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
447 .unwrap_err();
448 assert_matches!(err, Error::Unexpected { .. });
449 assert!(err.to_string().contains("intermediate state"));
450 }
451
452 #[tokio::test]
453 async fn test_next_close_downgraded_region_state() {
454 let mut state = Box::new(UpdateMetadata::Upgrade);
455 let env = TestingEnv::new();
456 let persistent_context = new_persistent_context();
457 let mut ctx = env.context_factory().new_context(persistent_context);
458 let opening_keeper = MemoryRegionKeeper::default();
459
460 let table_id = 1024;
461 let table_info = new_test_table_info(table_id);
462 let region_routes = vec![RegionRoute {
463 region: Region::new_test(RegionId::new(table_id, 1)),
464 leader_peer: Some(Peer::empty(1)),
465 leader_state: Some(LeaderState::Downgrading),
466 ..Default::default()
467 }];
468
469 let guard = opening_keeper
470 .register(2, RegionId::new(table_id, 1))
471 .unwrap();
472 ctx.volatile_ctx.opening_region_guards.push(guard);
473
474 env.create_physical_table_metadata(table_info, region_routes)
475 .await;
476
477 let table_metadata_manager = env.table_metadata_manager();
478
479 let procedure_ctx = new_procedure_context();
480 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
481
482 let _ = next
483 .as_any()
484 .downcast_ref::<CloseDowngradedRegion>()
485 .unwrap();
486
487 let table_route = table_metadata_manager
488 .table_route_manager()
489 .table_route_storage()
490 .get(table_id)
491 .await
492 .unwrap()
493 .unwrap();
494 let region_routes = table_route.region_routes().unwrap();
495
496 assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
497 assert_eq!(region_routes.len(), 1);
498 assert!(!region_routes[0].is_leader_downgrading());
499 assert!(region_routes[0].follower_peers.is_empty());
500 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
501 }
502}