meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29 fn build_upgrade_candidate_region_metadata(
31 &self,
32 ctx: &mut Context,
33 region_ids: &[RegionId],
34 mut region_routes: Vec<RegionRoute>,
35 ) -> Result<Vec<RegionRoute>> {
36 let old_leader_peer = &ctx.persistent_ctx.from_peer;
37 let new_leader_peer = &ctx.persistent_ctx.to_peer;
38 for region_id in region_ids {
39 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == *region_id)
43 .context(error::RegionRouteNotFoundSnafu {
44 region_id: *region_id,
45 })?;
46
47 region_route.set_leader_state(None);
49
50 ensure!(
52 region_route
53 .leader_peer
54 .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55 .is_some(),
56 error::UnexpectedSnafu {
57 violated: format!(
58 "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59 region_route.leader_peer, old_leader_peer
60 ),
61 }
62 );
63
64 region_route.leader_peer = Some(new_leader_peer.clone());
66
67 let removed = region_route
69 .follower_peers
70 .extract_if(.., |peer| peer.id == new_leader_peer.id)
71 .collect::<Vec<_>>();
72
73 if removed.len() > 1 {
75 warn!(
76 "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77 );
78 }
79 }
80
81 info!(
82 "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83 new_leader_peer, region_ids,
84 );
85
86 Ok(region_routes)
87 }
88
89 fn check_metadata_updated(
98 &self,
99 ctx: &mut Context,
100 region_ids: &[RegionId],
101 region_routes: &[RegionRoute],
102 ) -> Result<bool> {
103 for region_id in region_ids {
105 let region_route = region_routes
107 .iter()
108 .find(|route| route.region.id == *region_id)
109 .context(error::RegionRouteNotFoundSnafu {
110 region_id: *region_id,
111 })?;
112
113 let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115 violated: format!(
116 "The leader peer of region {region_id} is not found during the metadata upgrade check"
117 ),
118 })?;
119
120 if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122 return Ok(false);
123 } else {
124 ensure!(
126 !region_route.is_leader_downgrading(),
127 error::UnexpectedSnafu {
128 violated: format!(
129 "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130 ),
131 }
132 );
133 }
134 }
135
136 Ok(true)
138 }
139
140 pub async fn upgrade_candidate_region(
150 &self,
151 ctx: &mut Context,
152 ctx_provider: &ContextProviderRef,
153 ) -> Result<()> {
154 let table_metadata_manager = ctx.table_metadata_manager.clone();
155 let table_regions = ctx.persistent_ctx.table_regions();
156 let from_peer_id = ctx.persistent_ctx.from_peer.id;
157 let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159 for (table_id, region_ids) in table_regions {
160 let table_lock = TableLock::Write(table_id).into();
161 let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163 let table_route_value = ctx.get_table_route_value(table_id).await?;
164 let region_routes = table_route_value.region_routes().with_context(|_| {
165 error::UnexpectedLogicalRouteTableSnafu {
166 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167 }
168 })?;
169 if self.check_metadata_updated(ctx, ®ion_ids, region_routes)? {
170 continue;
171 }
172 let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173 let RegionInfo {
174 region_storage_path,
175 region_options,
176 region_wal_options,
177 engine,
178 } = datanode_table_value.region_info.clone();
179 let new_region_routes = self.build_upgrade_candidate_region_metadata(
180 ctx,
181 ®ion_ids,
182 region_routes.clone(),
183 )?;
184 let region_distribution = region_distribution(region_routes);
185 info!(
186 "Trying to update region routes to {:?} for table: {}",
187 region_distribution, table_id,
188 );
189
190 if let Err(err) = table_metadata_manager
191 .update_table_route(
192 table_id,
193 RegionInfo {
194 engine: engine.clone(),
195 region_storage_path: region_storage_path.clone(),
196 region_options: region_options.clone(),
197 region_wal_options: region_wal_options.clone(),
198 },
199 &table_route_value,
200 new_region_routes,
201 ®ion_options,
202 ®ion_wal_options,
203 )
204 .await
205 .context(error::TableMetadataManagerSnafu)
206 {
207 error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209 reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210 });
211 };
212 info!(
213 "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214 );
215 }
216
217 ctx.deregister_failure_detectors().await;
218 ctx.volatile_ctx.opening_region_guards.clear();
220
221 Ok(())
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::assert_matches::assert_matches;
228
229 use common_meta::key::test_utils::new_test_table_info;
230 use common_meta::peer::Peer;
231 use common_meta::region_keeper::MemoryRegionKeeper;
232 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
233 use common_time::util::current_time_millis;
234 use store_api::storage::RegionId;
235
236 use crate::error::Error;
237 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
238 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
239 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
240 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
241
242 fn new_persistent_context() -> PersistentContext {
243 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
244 }
245
246 #[tokio::test]
247 async fn test_table_route_is_not_found_error() {
248 let env = TestingEnv::new();
249 let persistent_context = new_persistent_context();
250 let ctx = env.context_factory().new_context(persistent_context);
251
252 let err = ctx.get_table_route_value(1024).await.unwrap_err();
253
254 assert_matches!(err, Error::TableRouteNotFound { .. });
255 assert!(!err.is_retryable());
256 }
257
258 #[tokio::test]
259 async fn test_region_route_is_not_found() {
260 let state = UpdateMetadata::Upgrade;
261 let env = TestingEnv::new();
262 let persistent_context = new_persistent_context();
263 let mut ctx = env.context_factory().new_context(persistent_context);
264
265 let table_info = new_test_table_info(1024, vec![2]).into();
266 let region_routes = vec![RegionRoute {
267 region: Region::new_test(RegionId::new(1024, 2)),
268 leader_peer: Some(Peer::empty(4)),
269 ..Default::default()
270 }];
271 env.create_physical_table_metadata(table_info, region_routes)
272 .await;
273
274 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
275 let region_routes = table_route_value
276 .into_inner()
277 .into_physical_table_route()
278 .region_routes;
279 let err = state
280 .build_upgrade_candidate_region_metadata(
281 &mut ctx,
282 &[RegionId::new(1024, 1)],
283 region_routes,
284 )
285 .unwrap_err();
286
287 assert_matches!(err, Error::RegionRouteNotFound { .. });
288 assert!(!err.is_retryable());
289 }
290
291 #[tokio::test]
292 async fn test_region_route_expected_leader() {
293 let state = UpdateMetadata::Upgrade;
294 let env = TestingEnv::new();
295 let persistent_context = new_persistent_context();
296 let mut ctx = env.context_factory().new_context(persistent_context);
297
298 let table_info = new_test_table_info(1024, vec![1]).into();
299 let region_routes = vec![RegionRoute {
300 region: Region::new_test(RegionId::new(1024, 1)),
301 leader_peer: Some(Peer::empty(3)),
302 ..Default::default()
303 }];
304
305 env.create_physical_table_metadata(table_info, region_routes)
306 .await;
307
308 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
309 let region_routes = table_route_value
310 .into_inner()
311 .into_physical_table_route()
312 .region_routes;
313 let err = state
314 .build_upgrade_candidate_region_metadata(
315 &mut ctx,
316 &[RegionId::new(1024, 1)],
317 region_routes,
318 )
319 .unwrap_err();
320
321 assert_matches!(err, Error::Unexpected { .. });
322 assert!(!err.is_retryable());
323 assert!(err.to_string().contains("Unexpected region leader"));
324 }
325
326 #[tokio::test]
327 async fn test_build_upgrade_candidate_region_metadata() {
328 let state = UpdateMetadata::Upgrade;
329 let env = TestingEnv::new();
330 let persistent_context = new_persistent_context();
331 let mut ctx = env.context_factory().new_context(persistent_context);
332
333 let table_info = new_test_table_info(1024, vec![1]).into();
334 let region_routes = vec![RegionRoute {
335 region: Region::new_test(RegionId::new(1024, 1)),
336 leader_peer: Some(Peer::empty(1)),
337 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
338 leader_state: Some(LeaderState::Downgrading),
339 leader_down_since: Some(current_time_millis()),
340 }];
341
342 env.create_physical_table_metadata(table_info, region_routes)
343 .await;
344
345 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
346 let region_routes = table_route_value
347 .into_inner()
348 .into_physical_table_route()
349 .region_routes;
350 let new_region_routes = state
351 .build_upgrade_candidate_region_metadata(
352 &mut ctx,
353 &[RegionId::new(1024, 1)],
354 region_routes,
355 )
356 .unwrap();
357
358 assert!(!new_region_routes[0].is_leader_downgrading());
359 assert!(new_region_routes[0].leader_down_since.is_none());
360 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
361 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
362 }
363
364 #[tokio::test]
365 async fn test_check_metadata() {
366 let state = UpdateMetadata::Upgrade;
367 let env = TestingEnv::new();
368 let persistent_context = new_persistent_context();
369 let leader_peer = persistent_context.from_peer.clone();
370
371 let mut ctx = env.context_factory().new_context(persistent_context);
372 let table_info = new_test_table_info(1024, vec![1]).into();
373 let region_routes = vec![RegionRoute {
374 region: Region::new_test(RegionId::new(1024, 1)),
375 leader_peer: Some(leader_peer),
376 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
377 leader_state: None,
378 leader_down_since: None,
379 }];
380
381 env.create_physical_table_metadata(table_info, region_routes)
382 .await;
383 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
384 let region_routes = table_routes.region_routes().unwrap();
385 let updated = state
386 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
387 .unwrap();
388 assert!(!updated);
389 }
390
391 #[tokio::test]
392 async fn test_check_metadata_updated() {
393 let state = UpdateMetadata::Upgrade;
394 let env = TestingEnv::new();
395 let persistent_context = new_persistent_context();
396 let candidate_peer = persistent_context.to_peer.clone();
397
398 let mut ctx = env.context_factory().new_context(persistent_context);
399 let table_info = new_test_table_info(1024, vec![1]).into();
400 let region_routes = vec![RegionRoute {
401 region: Region::new_test(RegionId::new(1024, 1)),
402 leader_peer: Some(candidate_peer),
403 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
404 leader_state: None,
405 leader_down_since: None,
406 }];
407
408 env.create_physical_table_metadata(table_info, region_routes)
409 .await;
410
411 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
412 let region_routes = table_routes.region_routes().unwrap();
413 let updated = state
414 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
415 .unwrap();
416 assert!(updated);
417 }
418
419 #[tokio::test]
420 async fn test_check_metadata_intermediate_state() {
421 let state = UpdateMetadata::Upgrade;
422 let env = TestingEnv::new();
423 let persistent_context = new_persistent_context();
424 let candidate_peer = persistent_context.to_peer.clone();
425
426 let mut ctx = env.context_factory().new_context(persistent_context);
427 let table_info = new_test_table_info(1024, vec![1]).into();
428 let region_routes = vec![RegionRoute {
429 region: Region::new_test(RegionId::new(1024, 1)),
430 leader_peer: Some(candidate_peer),
431 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
432 leader_state: Some(LeaderState::Downgrading),
433 leader_down_since: None,
434 }];
435
436 env.create_physical_table_metadata(table_info, region_routes)
437 .await;
438
439 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
440 let region_routes = table_routes.region_routes().unwrap();
441 let err = state
442 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
443 .unwrap_err();
444 assert_matches!(err, Error::Unexpected { .. });
445 assert!(err.to_string().contains("intermediate state"));
446 }
447
448 #[tokio::test]
449 async fn test_next_close_downgraded_region_state() {
450 let mut state = Box::new(UpdateMetadata::Upgrade);
451 let env = TestingEnv::new();
452 let persistent_context = new_persistent_context();
453 let mut ctx = env.context_factory().new_context(persistent_context);
454 let opening_keeper = MemoryRegionKeeper::default();
455
456 let table_id = 1024;
457 let table_info = new_test_table_info(table_id, vec![1]).into();
458 let region_routes = vec![RegionRoute {
459 region: Region::new_test(RegionId::new(table_id, 1)),
460 leader_peer: Some(Peer::empty(1)),
461 leader_state: Some(LeaderState::Downgrading),
462 ..Default::default()
463 }];
464
465 let guard = opening_keeper
466 .register(2, RegionId::new(table_id, 1))
467 .unwrap();
468 ctx.volatile_ctx.opening_region_guards.push(guard);
469
470 env.create_physical_table_metadata(table_info, region_routes)
471 .await;
472
473 let table_metadata_manager = env.table_metadata_manager();
474
475 let procedure_ctx = new_procedure_context();
476 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
477
478 let _ = next
479 .as_any()
480 .downcast_ref::<CloseDowngradedRegion>()
481 .unwrap();
482
483 let table_route = table_metadata_manager
484 .table_route_manager()
485 .table_route_storage()
486 .get(table_id)
487 .await
488 .unwrap()
489 .unwrap();
490 let region_routes = table_route.region_routes().unwrap();
491
492 assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
493 assert_eq!(region_routes.len(), 1);
494 assert!(!region_routes[0].is_leader_downgrading());
495 assert!(region_routes[0].follower_peers.is_empty());
496 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
497 }
498}