meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs1use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::rpc::router::{RegionRoute, region_distribution};
18use common_telemetry::{info, warn};
19use snafu::{OptionExt, ResultExt, ensure};
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::Context;
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24
25impl UpdateMetadata {
26 async fn build_upgrade_candidate_region_metadata(
28 &self,
29 ctx: &mut Context,
30 ) -> Result<Vec<RegionRoute>> {
31 let region_id = ctx.region_id();
32 let table_route_value = ctx.get_table_route_value().await?.clone();
33
34 let mut region_routes = table_route_value
35 .region_routes()
36 .context(error::UnexpectedLogicalRouteTableSnafu {
37 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
38 })?
39 .clone();
40 let region_route = region_routes
41 .iter_mut()
42 .find(|route| route.region.id == region_id)
43 .context(error::RegionRouteNotFoundSnafu { region_id })?;
44
45 region_route.set_leader_state(None);
47
48 let candidate = &ctx.persistent_ctx.to_peer;
49 let expected_old_leader = &ctx.persistent_ctx.from_peer;
50
51 ensure!(
53 region_route
54 .leader_peer
55 .take_if(|old_leader| old_leader.id == expected_old_leader.id)
56 .is_some(),
57 error::UnexpectedSnafu {
58 violated: format!(
59 "Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}",
60 region_route.leader_peer, expected_old_leader
61 ),
62 }
63 );
64
65 region_route.leader_peer = Some(candidate.clone());
66 info!(
67 "Upgrading candidate region to leader region: {:?} for region: {}",
68 candidate, region_id
69 );
70
71 let removed = region_route
73 .follower_peers
74 .extract_if(.., |peer| peer.id == candidate.id)
75 .collect::<Vec<_>>();
76
77 if removed.len() > 1 {
78 warn!(
79 "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}"
80 );
81 }
82
83 Ok(region_routes)
84 }
85
86 async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
88 let region_id = ctx.region_id();
89 let table_route_value = ctx.get_table_route_value().await?.clone();
90
91 let region_routes = table_route_value
92 .region_routes()
93 .context(error::UnexpectedLogicalRouteTableSnafu {
94 err_msg: format!("{self:?} is a non-physical TableRouteValue."),
95 })?
96 .clone();
97 let region_route = region_routes
98 .into_iter()
99 .find(|route| route.region.id == region_id)
100 .context(error::RegionRouteNotFoundSnafu { region_id })?;
101
102 let leader_peer = region_route
103 .leader_peer
104 .as_ref()
105 .context(error::UnexpectedSnafu {
106 violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
107 })?;
108
109 let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
110
111 if leader_peer.id == candidate_peer_id {
112 ensure!(
113 !region_route.is_leader_downgrading(),
114 error::UnexpectedSnafu {
115 violated: format!(
116 "Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"
117 ),
118 }
119 );
120
121 Ok(true)
122 } else {
123 Ok(false)
124 }
125 }
126
127 pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> {
137 let region_id = ctx.region_id();
138 let table_metadata_manager = ctx.table_metadata_manager.clone();
139
140 if self.check_metadata_updated(ctx).await? {
141 return Ok(());
142 }
143
144 let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
145 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
146 let RegionInfo {
147 region_storage_path,
148 region_options,
149 region_wal_options,
150 engine,
151 } = datanode_table_value.region_info.clone();
152 let table_route_value = ctx.get_table_route_value().await?;
153
154 let region_distribution = region_distribution(®ion_routes);
155 info!(
156 "Trying to update region routes to {:?} for table: {}",
157 region_distribution,
158 region_id.table_id()
159 );
160 if let Err(err) = table_metadata_manager
161 .update_table_route(
162 region_id.table_id(),
163 RegionInfo {
164 engine: engine.to_string(),
165 region_storage_path: region_storage_path.to_string(),
166 region_options: region_options.clone(),
167 region_wal_options: region_wal_options.clone(),
168 },
169 table_route_value,
170 region_routes,
171 ®ion_options,
172 ®ion_wal_options,
173 )
174 .await
175 .context(error::TableMetadataManagerSnafu)
176 {
177 ctx.remove_table_route_value();
178 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
179 reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
180 });
181 };
182
183 ctx.remove_table_route_value();
184 ctx.deregister_failure_detectors().await;
185 ctx.volatile_ctx.opening_region_guard.take();
187
188 Ok(())
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use std::assert_matches::assert_matches;
195
196 use common_meta::key::test_utils::new_test_table_info;
197 use common_meta::peer::Peer;
198 use common_meta::region_keeper::MemoryRegionKeeper;
199 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
200 use common_time::util::current_time_millis;
201 use store_api::storage::RegionId;
202
203 use crate::error::Error;
204 use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
205 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
206 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
207 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
208
209 fn new_persistent_context() -> PersistentContext {
210 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
211 }
212
213 #[tokio::test]
214 async fn test_table_route_is_not_found_error() {
215 let state = UpdateMetadata::Upgrade;
216
217 let env = TestingEnv::new();
218 let persistent_context = new_persistent_context();
219 let mut ctx = env.context_factory().new_context(persistent_context);
220
221 let err = state
222 .build_upgrade_candidate_region_metadata(&mut ctx)
223 .await
224 .unwrap_err();
225
226 assert_matches!(err, Error::TableRouteNotFound { .. });
227 assert!(!err.is_retryable());
228 }
229
230 #[tokio::test]
231 async fn test_region_route_is_not_found() {
232 let state = UpdateMetadata::Upgrade;
233 let env = TestingEnv::new();
234 let persistent_context = new_persistent_context();
235 let mut ctx = env.context_factory().new_context(persistent_context);
236
237 let table_info = new_test_table_info(1024, vec![2]).into();
238 let region_routes = vec![RegionRoute {
239 region: Region::new_test(RegionId::new(1024, 2)),
240 leader_peer: Some(Peer::empty(4)),
241 ..Default::default()
242 }];
243
244 env.create_physical_table_metadata(table_info, region_routes)
245 .await;
246
247 let err = state
248 .build_upgrade_candidate_region_metadata(&mut ctx)
249 .await
250 .unwrap_err();
251
252 assert_matches!(err, Error::RegionRouteNotFound { .. });
253 assert!(!err.is_retryable());
254 }
255
256 #[tokio::test]
257 async fn test_region_route_expected_leader() {
258 let state = UpdateMetadata::Upgrade;
259 let env = TestingEnv::new();
260 let persistent_context = new_persistent_context();
261 let mut ctx = env.context_factory().new_context(persistent_context);
262
263 let table_info = new_test_table_info(1024, vec![1]).into();
264 let region_routes = vec![RegionRoute {
265 region: Region::new_test(RegionId::new(1024, 1)),
266 leader_peer: Some(Peer::empty(3)),
267 ..Default::default()
268 }];
269
270 env.create_physical_table_metadata(table_info, region_routes)
271 .await;
272
273 let err = state
274 .build_upgrade_candidate_region_metadata(&mut ctx)
275 .await
276 .unwrap_err();
277
278 assert_matches!(err, Error::Unexpected { .. });
279 assert!(!err.is_retryable());
280 assert!(err.to_string().contains("Unexpected region leader"));
281 }
282
283 #[tokio::test]
284 async fn test_build_upgrade_candidate_region_metadata() {
285 let state = UpdateMetadata::Upgrade;
286 let env = TestingEnv::new();
287 let persistent_context = new_persistent_context();
288 let mut ctx = env.context_factory().new_context(persistent_context);
289
290 let table_info = new_test_table_info(1024, vec![1]).into();
291 let region_routes = vec![RegionRoute {
292 region: Region::new_test(RegionId::new(1024, 1)),
293 leader_peer: Some(Peer::empty(1)),
294 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
295 leader_state: Some(LeaderState::Downgrading),
296 leader_down_since: Some(current_time_millis()),
297 }];
298
299 env.create_physical_table_metadata(table_info, region_routes)
300 .await;
301
302 let new_region_routes = state
303 .build_upgrade_candidate_region_metadata(&mut ctx)
304 .await
305 .unwrap();
306
307 assert!(!new_region_routes[0].is_leader_downgrading());
308 assert!(new_region_routes[0].leader_down_since.is_none());
309 assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
310 assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
311 }
312
313 #[tokio::test]
314 async fn test_failed_to_update_table_route_error() {
315 let state = UpdateMetadata::Upgrade;
316 let env = TestingEnv::new();
317 let persistent_context = new_persistent_context();
318 let mut ctx = env.context_factory().new_context(persistent_context);
319 let opening_keeper = MemoryRegionKeeper::default();
320
321 let table_id = 1024;
322 let table_info = new_test_table_info(table_id, vec![1]).into();
323 let region_routes = vec![
324 RegionRoute {
325 region: Region::new_test(RegionId::new(table_id, 1)),
326 leader_peer: Some(Peer::empty(1)),
327 follower_peers: vec![Peer::empty(5), Peer::empty(3)],
328 leader_state: Some(LeaderState::Downgrading),
329 leader_down_since: Some(current_time_millis()),
330 },
331 RegionRoute {
332 region: Region::new_test(RegionId::new(table_id, 2)),
333 leader_peer: Some(Peer::empty(4)),
334 leader_state: Some(LeaderState::Downgrading),
335 ..Default::default()
336 },
337 ];
338
339 env.create_physical_table_metadata(table_info, region_routes)
340 .await;
341
342 let table_metadata_manager = env.table_metadata_manager();
343 let original_table_route = table_metadata_manager
344 .table_route_manager()
345 .table_route_storage()
346 .get_with_raw_bytes(table_id)
347 .await
348 .unwrap()
349 .unwrap();
350
351 table_metadata_manager
353 .update_leader_region_status(table_id, &original_table_route, |route| {
354 if route.region.id == RegionId::new(1024, 2) {
355 Some(None)
357 } else {
358 None
359 }
360 })
361 .await
362 .unwrap();
363
364 ctx.volatile_ctx.table_route = Some(original_table_route);
366 let guard = opening_keeper
367 .register(2, RegionId::new(table_id, 1))
368 .unwrap();
369 ctx.volatile_ctx.opening_region_guard = Some(guard);
370 let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
371
372 assert!(ctx.volatile_ctx.table_route.is_none());
373 assert!(ctx.volatile_ctx.opening_region_guard.is_some());
374 assert!(err.is_retryable());
375 assert!(format!("{err:?}").contains("Failed to update the table route"));
376 }
377
378 #[tokio::test]
379 async fn test_check_metadata() {
380 let state = UpdateMetadata::Upgrade;
381 let env = TestingEnv::new();
382 let persistent_context = new_persistent_context();
383 let leader_peer = persistent_context.from_peer.clone();
384
385 let mut ctx = env.context_factory().new_context(persistent_context);
386 let table_info = new_test_table_info(1024, vec![1]).into();
387 let region_routes = vec![RegionRoute {
388 region: Region::new_test(RegionId::new(1024, 1)),
389 leader_peer: Some(leader_peer),
390 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
391 leader_state: None,
392 leader_down_since: None,
393 }];
394
395 env.create_physical_table_metadata(table_info, region_routes)
396 .await;
397
398 let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
399 assert!(!updated);
400 }
401
402 #[tokio::test]
403 async fn test_check_metadata_updated() {
404 let state = UpdateMetadata::Upgrade;
405 let env = TestingEnv::new();
406 let persistent_context = new_persistent_context();
407 let candidate_peer = persistent_context.to_peer.clone();
408
409 let mut ctx = env.context_factory().new_context(persistent_context);
410 let table_info = new_test_table_info(1024, vec![1]).into();
411 let region_routes = vec![RegionRoute {
412 region: Region::new_test(RegionId::new(1024, 1)),
413 leader_peer: Some(candidate_peer),
414 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
415 leader_state: None,
416 leader_down_since: None,
417 }];
418
419 env.create_physical_table_metadata(table_info, region_routes)
420 .await;
421
422 let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
423 assert!(updated);
424 }
425
426 #[tokio::test]
427 async fn test_check_metadata_intermediate_state() {
428 let state = UpdateMetadata::Upgrade;
429 let env = TestingEnv::new();
430 let persistent_context = new_persistent_context();
431 let candidate_peer = persistent_context.to_peer.clone();
432
433 let mut ctx = env.context_factory().new_context(persistent_context);
434 let table_info = new_test_table_info(1024, vec![1]).into();
435 let region_routes = vec![RegionRoute {
436 region: Region::new_test(RegionId::new(1024, 1)),
437 leader_peer: Some(candidate_peer),
438 follower_peers: vec![Peer::empty(2), Peer::empty(3)],
439 leader_state: Some(LeaderState::Downgrading),
440 leader_down_since: None,
441 }];
442
443 env.create_physical_table_metadata(table_info, region_routes)
444 .await;
445
446 let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
447 assert_matches!(err, Error::Unexpected { .. });
448 assert!(err.to_string().contains("intermediate state"));
449 }
450
451 #[tokio::test]
452 async fn test_next_close_downgraded_region_state() {
453 let mut state = Box::new(UpdateMetadata::Upgrade);
454 let env = TestingEnv::new();
455 let persistent_context = new_persistent_context();
456 let mut ctx = env.context_factory().new_context(persistent_context);
457 let opening_keeper = MemoryRegionKeeper::default();
458
459 let table_id = 1024;
460 let table_info = new_test_table_info(table_id, vec![1]).into();
461 let region_routes = vec![RegionRoute {
462 region: Region::new_test(RegionId::new(table_id, 1)),
463 leader_peer: Some(Peer::empty(1)),
464 leader_state: Some(LeaderState::Downgrading),
465 ..Default::default()
466 }];
467
468 let guard = opening_keeper
469 .register(2, RegionId::new(table_id, 1))
470 .unwrap();
471 ctx.volatile_ctx.opening_region_guard = Some(guard);
472
473 env.create_physical_table_metadata(table_info, region_routes)
474 .await;
475
476 let table_metadata_manager = env.table_metadata_manager();
477
478 let procedure_ctx = new_procedure_context();
479 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
480
481 let _ = next
482 .as_any()
483 .downcast_ref::<CloseDowngradedRegion>()
484 .unwrap();
485
486 let table_route = table_metadata_manager
487 .table_route_manager()
488 .table_route_storage()
489 .get(table_id)
490 .await
491 .unwrap()
492 .unwrap();
493 let region_routes = table_route.region_routes().unwrap();
494
495 assert!(ctx.volatile_ctx.table_route.is_none());
496 assert!(ctx.volatile_ctx.opening_region_guard.is_none());
497 assert_eq!(region_routes.len(), 1);
498 assert!(!region_routes[0].is_leader_downgrading());
499 assert!(region_routes[0].follower_peers.is_empty());
500 assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
501 }
502}