meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29 fn build_upgrade_candidate_region_metadata(
31 &self,
32 ctx: &mut Context,
33 region_ids: &[RegionId],
34 mut region_routes: Vec<RegionRoute>,
35 ) -> Result<Vec<RegionRoute>> {
36 let old_leader_peer = &ctx.persistent_ctx.from_peer;
37 let new_leader_peer = &ctx.persistent_ctx.to_peer;
38 for region_id in region_ids {
39 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == *region_id)
43 .context(error::RegionRouteNotFoundSnafu {
44 region_id: *region_id,
45 })?;
46
47 region_route.set_leader_state(None);
49
50 ensure!(
52 region_route
53 .leader_peer
54 .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55 .is_some(),
56 error::UnexpectedSnafu {
57 violated: format!(
58 "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59 region_route.leader_peer, old_leader_peer
60 ),
61 }
62 );
63
64 region_route.leader_peer = Some(new_leader_peer.clone());
66
67 let removed = region_route
69 .follower_peers
70 .extract_if(.., |peer| peer.id == new_leader_peer.id)
71 .collect::<Vec<_>>();
72
73 if removed.len() > 1 {
75 warn!(
76 "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77 );
78 }
79 }
80
81 info!(
82 "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83 new_leader_peer, region_ids,
84 );
85
86 Ok(region_routes)
87 }
88
89 fn check_metadata_updated(
98 &self,
99 ctx: &mut Context,
100 region_ids: &[RegionId],
101 region_routes: &[RegionRoute],
102 ) -> Result<bool> {
103 for region_id in region_ids {
105 let region_route = region_routes
107 .iter()
108 .find(|route| route.region.id == *region_id)
109 .context(error::RegionRouteNotFoundSnafu {
110 region_id: *region_id,
111 })?;
112
113 let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115 violated: format!(
116 "The leader peer of region {region_id} is not found during the metadata upgrade check"
117 ),
118 })?;
119
120 if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122 return Ok(false);
123 } else {
124 ensure!(
126 !region_route.is_leader_downgrading(),
127 error::UnexpectedSnafu {
128 violated: format!(
129 "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130 ),
131 }
132 );
133 }
134 }
135
136 Ok(true)
138 }
139
140 pub async fn upgrade_candidate_region(
150 &self,
151 ctx: &mut Context,
152 ctx_provider: &ContextProviderRef,
153 ) -> Result<()> {
154 let table_metadata_manager = ctx.table_metadata_manager.clone();
155 let table_regions = ctx.persistent_ctx.table_regions();
156 let from_peer_id = ctx.persistent_ctx.from_peer.id;
157 let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159 for (table_id, region_ids) in table_regions {
160 let table_lock = TableLock::Write(table_id).into();
161 let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163 let table_route_value = ctx.get_table_route_value(table_id).await?;
164 let region_routes = table_route_value.region_routes().with_context(|_| {
165 error::UnexpectedLogicalRouteTableSnafu {
166 err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167 }
168 })?;
169 if self.check_metadata_updated(ctx, ®ion_ids, region_routes)? {
170 continue;
171 }
172 let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173 let RegionInfo {
174 region_storage_path,
175 region_options,
176 region_wal_options,
177 engine,
178 } = datanode_table_value.region_info.clone();
179 let new_region_routes = self.build_upgrade_candidate_region_metadata(
180 ctx,
181 ®ion_ids,
182 region_routes.clone(),
183 )?;
184 let region_distribution = region_distribution(region_routes);
185 info!(
186 "Trying to update region routes to {:?} for table: {}",
187 region_distribution, table_id,
188 );
189
190 if let Err(err) = table_metadata_manager
191 .update_table_route(
192 table_id,
193 RegionInfo {
194 engine: engine.clone(),
195 region_storage_path: region_storage_path.clone(),
196 region_options: region_options.clone(),
197 region_wal_options: region_wal_options.clone(),
198 },
199 &table_route_value,
200 new_region_routes,
201 ®ion_options,
202 ®ion_wal_options,
203 )
204 .await
205 .context(error::TableMetadataManagerSnafu)
206 {
207 error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209 reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210 });
211 };
212 info!(
213 "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214 );
215 }
216
217 ctx.deregister_failure_detectors().await;
218 ctx.volatile_ctx.opening_region_guards.clear();
220
221 Ok(())
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::assert_matches;
228
229 use common_meta::key::test_utils::new_test_table_info;
230 use common_meta::peer::Peer;
231 use common_meta::region_keeper::MemoryRegionKeeper;
232 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
233 use common_time::util::current_time_millis;
234 use store_api::region_engine::RegionRole;
235 use store_api::storage::RegionId;
236
237 use crate::error::Error;
238 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
239 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
240 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
241 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
242
243 fn new_persistent_context() -> PersistentContext {
244 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
245 }
246
247 #[tokio::test]
248 async fn test_table_route_is_not_found_error() {
249 let env = TestingEnv::new();
250 let persistent_context = new_persistent_context();
251 let ctx = env.context_factory().new_context(persistent_context);
252
253 let err = ctx.get_table_route_value(1024).await.unwrap_err();
254
255 assert_matches!(err, Error::TableRouteNotFound { .. });
256 assert!(!err.is_retryable());
257 }
258
259 #[tokio::test]
260 async fn test_region_route_is_not_found() {
261 let state = UpdateMetadata::Upgrade;
262 let env = TestingEnv::new();
263 let persistent_context = new_persistent_context();
264 let mut ctx = env.context_factory().new_context(persistent_context);
265
266 let table_info = new_test_table_info(1024);
267 let region_routes = vec![RegionRoute {
268 region: Region::new_test(RegionId::new(1024, 2)),
269 leader_peer: Some(Peer::empty(4)),
270 ..Default::default()
271 }];
272 env.create_physical_table_metadata(table_info, region_routes)
273 .await;
274
275 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
276 let region_routes = table_route_value
277 .into_inner()
278 .into_physical_table_route()
279 .region_routes;
280 let err = state
281 .build_upgrade_candidate_region_metadata(
282 &mut ctx,
283 &[RegionId::new(1024, 1)],
284 region_routes,
285 )
286 .unwrap_err();
287
288 assert_matches!(err, Error::RegionRouteNotFound { .. });
289 assert!(!err.is_retryable());
290 }
291
292 #[tokio::test]
293 async fn test_region_route_expected_leader() {
294 let state = UpdateMetadata::Upgrade;
295 let env = TestingEnv::new();
296 let persistent_context = new_persistent_context();
297 let mut ctx = env.context_factory().new_context(persistent_context);
298
299 let table_info = new_test_table_info(1024);
300 let region_routes = vec![RegionRoute {
301 region: Region::new_test(RegionId::new(1024, 1)),
302 leader_peer: Some(Peer::empty(3)),
303 ..Default::default()
304 }];
305
306 env.create_physical_table_metadata(table_info, region_routes)
307 .await;
308
309 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
310 let region_routes = table_route_value
311 .into_inner()
312 .into_physical_table_route()
313 .region_routes;
314 let err = state
315 .build_upgrade_candidate_region_metadata(
316 &mut ctx,
317 &[RegionId::new(1024, 1)],
318 region_routes,
319 )
320 .unwrap_err();
321
322 assert_matches!(err, Error::Unexpected { .. });
323 assert!(!err.is_retryable());
324 assert!(err.to_string().contains("Unexpected region leader"));
325 }
326
327 #[tokio::test]
328 async fn test_build_upgrade_candidate_region_metadata() {
329 let state = UpdateMetadata::Upgrade;
330 let env = TestingEnv::new();
331 let persistent_context = new_persistent_context();
332 let mut ctx = env.context_factory().new_context(persistent_context);
333
334 let table_info = new_test_table_info(1024);
335 let region_routes = vec![RegionRoute {
336 region: Region::new_test(RegionId::new(1024, 1)),
337 leader_peer: Some(Peer::empty(1)),
338 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
339 leader_state: Some(LeaderState::Downgrading),
340 leader_down_since: Some(current_time_millis()),
341 write_route_policy: None,
342 }];
343
344 env.create_physical_table_metadata(table_info, region_routes)
345 .await;
346
347 let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
348 let region_routes = table_route_value
349 .into_inner()
350 .into_physical_table_route()
351 .region_routes;
352 let new_region_routes = state
353 .build_upgrade_candidate_region_metadata(
354 &mut ctx,
355 &[RegionId::new(1024, 1)],
356 region_routes,
357 )
358 .unwrap();
359
360 assert!(!new_region_routes[0].is_leader_downgrading());
361 assert!(new_region_routes[0].leader_down_since.is_none());
362 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
363 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
364 }
365
366 #[tokio::test]
367 async fn test_check_metadata() {
368 let state = UpdateMetadata::Upgrade;
369 let env = TestingEnv::new();
370 let persistent_context = new_persistent_context();
371 let leader_peer = persistent_context.from_peer.clone();
372
373 let mut ctx = env.context_factory().new_context(persistent_context);
374 let table_info = new_test_table_info(1024);
375 let region_routes = vec![RegionRoute {
376 region: Region::new_test(RegionId::new(1024, 1)),
377 leader_peer: Some(leader_peer),
378 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
379 leader_state: None,
380 leader_down_since: None,
381 write_route_policy: None,
382 }];
383
384 env.create_physical_table_metadata(table_info, region_routes)
385 .await;
386 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
387 let region_routes = table_routes.region_routes().unwrap();
388 let updated = state
389 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
390 .unwrap();
391 assert!(!updated);
392 }
393
394 #[tokio::test]
395 async fn test_check_metadata_updated() {
396 let state = UpdateMetadata::Upgrade;
397 let env = TestingEnv::new();
398 let persistent_context = new_persistent_context();
399 let candidate_peer = persistent_context.to_peer.clone();
400
401 let mut ctx = env.context_factory().new_context(persistent_context);
402 let table_info = new_test_table_info(1024);
403 let region_routes = vec![RegionRoute {
404 region: Region::new_test(RegionId::new(1024, 1)),
405 leader_peer: Some(candidate_peer),
406 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
407 leader_state: None,
408 leader_down_since: None,
409 write_route_policy: None,
410 }];
411
412 env.create_physical_table_metadata(table_info, region_routes)
413 .await;
414
415 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
416 let region_routes = table_routes.region_routes().unwrap();
417 let updated = state
418 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
419 .unwrap();
420 assert!(updated);
421 }
422
423 #[tokio::test]
424 async fn test_check_metadata_intermediate_state() {
425 let state = UpdateMetadata::Upgrade;
426 let env = TestingEnv::new();
427 let persistent_context = new_persistent_context();
428 let candidate_peer = persistent_context.to_peer.clone();
429
430 let mut ctx = env.context_factory().new_context(persistent_context);
431 let table_info = new_test_table_info(1024);
432 let region_routes = vec![RegionRoute {
433 region: Region::new_test(RegionId::new(1024, 1)),
434 leader_peer: Some(candidate_peer),
435 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
436 leader_state: Some(LeaderState::Downgrading),
437 leader_down_since: None,
438 write_route_policy: None,
439 }];
440
441 env.create_physical_table_metadata(table_info, region_routes)
442 .await;
443
444 let table_routes = ctx.get_table_route_value(1024).await.unwrap();
445 let region_routes = table_routes.region_routes().unwrap();
446 let err = state
447 .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
448 .unwrap_err();
449 assert_matches!(err, Error::Unexpected { .. });
450 assert!(err.to_string().contains("intermediate state"));
451 }
452
453 #[tokio::test]
454 async fn test_next_close_downgraded_region_state() {
455 let mut state = Box::new(UpdateMetadata::Upgrade);
456 let env = TestingEnv::new();
457 let persistent_context = new_persistent_context();
458 let mut ctx = env.context_factory().new_context(persistent_context);
459 let opening_keeper = MemoryRegionKeeper::default();
460
461 let table_id = 1024;
462 let table_info = new_test_table_info(table_id);
463 let region_routes = vec![RegionRoute {
464 region: Region::new_test(RegionId::new(table_id, 1)),
465 leader_peer: Some(Peer::empty(1)),
466 leader_state: Some(LeaderState::Downgrading),
467 ..Default::default()
468 }];
469
470 let guard = opening_keeper
471 .register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower)
472 .unwrap();
473 ctx.volatile_ctx.opening_region_guards.push(guard);
474
475 env.create_physical_table_metadata(table_info, region_routes)
476 .await;
477
478 let table_metadata_manager = env.table_metadata_manager();
479
480 let procedure_ctx = new_procedure_context();
481 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
482
483 let _ = next
484 .as_any()
485 .downcast_ref::<CloseDowngradedRegion>()
486 .unwrap();
487
488 let table_route = table_metadata_manager
489 .table_route_manager()
490 .table_route_storage()
491 .get(table_id)
492 .await
493 .unwrap()
494 .unwrap();
495 let region_routes = table_route.region_routes().unwrap();
496
497 assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
498 assert_eq!(region_routes.len(), 1);
499 assert!(!region_routes[0].is_leader_downgrading());
500 assert!(region_routes[0].follower_peers.is_empty());
501 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
502 }
503}