meta_srv/procedure/region_migration/update_metadata/
rollback_downgraded_region.rs1use common_error::ext::BoxedError;
16use common_meta::lock_key::TableLock;
17use common_procedure::ContextProviderRef;
18use common_telemetry::{error, info};
19use snafu::ResultExt;
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::Context;
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24
25impl UpdateMetadata {
26 pub async fn rollback_downgraded_region(
35 &self,
36 ctx: &mut Context,
37 ctx_provider: &ContextProviderRef,
38 ) -> Result<()> {
39 let table_metadata_manager = ctx.table_metadata_manager.clone();
40 let table_regions = ctx.persistent_ctx.table_regions();
41
42 for (table_id, regions) in table_regions {
43 let table_lock = TableLock::Write(table_id).into();
44 let _guard = ctx_provider.acquire_lock(&table_lock).await;
45
46 let current_table_route_value = ctx.get_table_route_value(table_id).await?;
47 if let Err(err) = table_metadata_manager
48 .update_leader_region_status(table_id, ¤t_table_route_value, |route| {
49 if regions.contains(&route.region.id) {
50 Some(None)
51 } else {
52 None
53 }
54 })
55 .await
56 .context(error::TableMetadataManagerSnafu)
57 {
58 error!(err; "Failed to update the table route during the rollback downgraded leader regions: {regions:?}");
59 return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
60 reason: format!("Failed to update the table route during the rollback downgraded leader regions: {regions:?}"),
61 });
62 }
63 info!(
64 "Rolling back downgraded leader region table route success, table_id: {table_id}, regions: {regions:?}"
65 );
66 }
67 ctx.register_failure_detectors().await;
68
69 Ok(())
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::assert_matches::assert_matches;
76 use std::collections::HashMap;
77 use std::sync::Arc;
78
79 use common_meta::key::test_utils::new_test_table_info;
80 use common_meta::peer::Peer;
81 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
82 use common_procedure_test::MockContextProvider;
83 use store_api::storage::RegionId;
84
85 use crate::error::Error;
86 use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
87 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
88 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
89 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
90
91 fn new_persistent_context() -> PersistentContext {
92 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
93 }
94
95 #[tokio::test]
96 async fn test_table_route_is_not_found_error() {
97 let state = UpdateMetadata::Rollback;
98 let env = TestingEnv::new();
99 let persistent_context = new_persistent_context();
100 let mut ctx = env.context_factory().new_context(persistent_context);
101
102 let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
103 let err = state
104 .rollback_downgraded_region(&mut ctx, &provider)
105 .await
106 .unwrap_err();
107
108 assert_matches!(err, Error::TableRouteNotFound { .. });
109
110 assert!(!err.is_retryable());
111 }
112
113 #[tokio::test]
114 async fn test_next_migration_end_state() {
115 let mut state = Box::new(UpdateMetadata::Rollback);
116 let persistent_context = new_persistent_context();
117 let from_peer = persistent_context.from_peer.clone();
118
119 let env = TestingEnv::new();
120 let mut ctx = env.context_factory().new_context(persistent_context);
121 let table_id = ctx.persistent_ctx.region_ids[0].table_id();
122
123 let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
124 let region_routes = vec![
125 RegionRoute {
126 region: Region::new_test(RegionId::new(1024, 1)),
127 leader_peer: Some(from_peer.clone()),
128 leader_state: Some(LeaderState::Downgrading),
129 ..Default::default()
130 },
131 RegionRoute {
132 region: Region::new_test(RegionId::new(1024, 2)),
133 leader_peer: Some(Peer::empty(4)),
134 leader_state: Some(LeaderState::Downgrading),
135 ..Default::default()
136 },
137 RegionRoute {
138 region: Region::new_test(RegionId::new(1024, 3)),
139 leader_peer: Some(Peer::empty(5)),
140 ..Default::default()
141 },
142 ];
143
144 let expected_region_routes = {
145 let mut region_routes = region_routes.clone();
146 region_routes[0].leader_state = None;
147 region_routes
148 };
149
150 env.create_physical_table_metadata(table_info, region_routes)
151 .await;
152
153 let table_metadata_manager = env.table_metadata_manager();
154
155 let procedure_ctx = new_procedure_context();
156 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
157
158 let _ = next
159 .as_any()
160 .downcast_ref::<RegionMigrationAbort>()
161 .unwrap();
162
163 let table_route = table_metadata_manager
164 .table_route_manager()
165 .table_route_storage()
166 .get(table_id)
167 .await
168 .unwrap()
169 .unwrap();
170 assert_eq!(
171 &expected_region_routes,
172 table_route.region_routes().unwrap()
173 );
174 }
175}