meta_srv/procedure/region_migration/update_metadata/
rollback_downgraded_region.rsuse common_error::ext::BoxedError;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::Context;
impl UpdateMetadata {
pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let region_id = ctx.region_id();
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
{
ctx.remove_table_route_value();
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
});
}
ctx.register_failure_detectors().await;
ctx.remove_table_route_value();
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
use crate::region::supervisor::RegionFailureDetectorControl;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}
#[tokio::test]
async fn test_table_route_is_not_found_error() {
let state = UpdateMetadata::Rollback;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_update_table_route_with_retry() {
let state = UpdateMetadata::Rollback;
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(5)),
..Default::default()
},
];
let expected_region_routes = {
let mut region_routes = region_routes.clone();
region_routes[0].leader_state = None;
region_routes[1].leader_state = None;
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let old_table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.unwrap()
.unwrap();
table_metadata_manager
.update_leader_region_status(table_id, &old_table_route, |route| {
if route.region.id == RegionId::new(1024, 2) {
Some(None)
} else {
None
}
})
.await
.unwrap();
ctx.volatile_ctx.table_route = Some(old_table_route);
let err = state
.rollback_downgraded_region(&mut ctx)
.await
.unwrap_err();
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
assert_eq!(rx.len(), 0);
state.rollback_downgraded_region(&mut ctx).await.unwrap();
let event = rx.try_recv().unwrap();
let detecting_regions = event.into_region_failure_detectors();
assert_eq!(
detecting_regions,
vec![(from_peer.id, ctx.persistent_ctx.region_id)]
);
let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
assert_eq!(
&expected_region_routes,
table_route.region_routes().unwrap()
);
}
#[tokio::test]
async fn test_next_migration_end_state() {
let mut state = Box::new(UpdateMetadata::Rollback);
let persistent_context = new_persistent_context();
let from_peer = persistent_context.from_peer.clone();
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(from_peer.clone()),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(4)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(5)),
..Default::default()
},
];
let expected_region_routes = {
let mut region_routes = region_routes.clone();
region_routes[0].leader_state = None;
region_routes
};
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_metadata_manager = env.table_metadata_manager();
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next
.as_any()
.downcast_ref::<RegionMigrationAbort>()
.unwrap();
assert!(ctx.volatile_ctx.table_route.is_none());
let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
assert_eq!(
&expected_region_routes,
table_route.region_routes().unwrap()
);
}
}