meta_srv/procedure/region_migration/update_metadata/
rollback_downgraded_region.rs1use common_error::ext::BoxedError;
16use snafu::ResultExt;
17
18use crate::error::{self, Result};
19use crate::procedure::region_migration::update_metadata::UpdateMetadata;
20use crate::procedure::region_migration::Context;
21
22impl UpdateMetadata {
23 pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> {
32 let table_metadata_manager = ctx.table_metadata_manager.clone();
33 let region_id = ctx.region_id();
34 let table_id = region_id.table_id();
35 let current_table_route_value = ctx.get_table_route_value().await?;
36
37 if let Err(err) = table_metadata_manager
38 .update_leader_region_status(table_id, current_table_route_value, |route| {
39 if route.region.id == region_id {
40 Some(None)
41 } else {
42 None
43 }
44 })
45 .await
46 .context(error::TableMetadataManagerSnafu)
47 {
48 ctx.remove_table_route_value();
49 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
50 reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
51 });
52 }
53
54 ctx.register_failure_detectors().await;
55 ctx.remove_table_route_value();
56
57 Ok(())
58 }
59}
60
61#[cfg(test)]
62mod tests {
63 use std::assert_matches::assert_matches;
64 use std::sync::Arc;
65
66 use common_meta::key::test_utils::new_test_table_info;
67 use common_meta::peer::Peer;
68 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
69 use store_api::storage::RegionId;
70
71 use crate::error::Error;
72 use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
73 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
74 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
75 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
76 use crate::region::supervisor::RegionFailureDetectorControl;
77
78 fn new_persistent_context() -> PersistentContext {
79 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
80 }
81
82 #[tokio::test]
83 async fn test_table_route_is_not_found_error() {
84 let state = UpdateMetadata::Rollback;
85 let env = TestingEnv::new();
86 let persistent_context = new_persistent_context();
87 let mut ctx = env.context_factory().new_context(persistent_context);
88
89 let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
90
91 assert_matches!(err, Error::TableRouteNotFound { .. });
92
93 assert!(!err.is_retryable());
94 }
95
96 #[tokio::test]
97 async fn test_update_table_route_with_retry() {
98 let state = UpdateMetadata::Rollback;
99 let persistent_context = new_persistent_context();
100 let from_peer = persistent_context.from_peer.clone();
101
102 let env = TestingEnv::new();
103 let mut ctx = env.context_factory().new_context(persistent_context);
104 let (tx, mut rx) = tokio::sync::mpsc::channel(8);
105 ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
106 let table_id = ctx.region_id().table_id();
107
108 let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
109 let region_routes = vec![
110 RegionRoute {
111 region: Region::new_test(RegionId::new(1024, 1)),
112 leader_peer: Some(from_peer.clone()),
113 leader_state: Some(LeaderState::Downgrading),
114 ..Default::default()
115 },
116 RegionRoute {
117 region: Region::new_test(RegionId::new(1024, 2)),
118 leader_peer: Some(Peer::empty(4)),
119 leader_state: Some(LeaderState::Downgrading),
120 ..Default::default()
121 },
122 RegionRoute {
123 region: Region::new_test(RegionId::new(1024, 3)),
124 leader_peer: Some(Peer::empty(5)),
125 ..Default::default()
126 },
127 ];
128
129 let expected_region_routes = {
130 let mut region_routes = region_routes.clone();
131 region_routes[0].leader_state = None;
132 region_routes[1].leader_state = None;
133 region_routes
134 };
135
136 env.create_physical_table_metadata(table_info, region_routes)
137 .await;
138
139 let table_metadata_manager = env.table_metadata_manager();
140 let old_table_route = table_metadata_manager
141 .table_route_manager()
142 .table_route_storage()
143 .get_with_raw_bytes(table_id)
144 .await
145 .unwrap()
146 .unwrap();
147
148 table_metadata_manager
150 .update_leader_region_status(table_id, &old_table_route, |route| {
151 if route.region.id == RegionId::new(1024, 2) {
152 Some(None)
153 } else {
154 None
155 }
156 })
157 .await
158 .unwrap();
159
160 ctx.volatile_ctx.table_route = Some(old_table_route);
161
162 let err = state
163 .rollback_downgraded_region(&mut ctx)
164 .await
165 .unwrap_err();
166 assert!(ctx.volatile_ctx.table_route.is_none());
167 assert!(err.is_retryable());
168 assert!(format!("{err:?}").contains("Failed to update the table route"));
169 assert_eq!(rx.len(), 0);
170 state.rollback_downgraded_region(&mut ctx).await.unwrap();
171 let event = rx.try_recv().unwrap();
172 let detecting_regions = event.into_region_failure_detectors();
173 assert_eq!(
174 detecting_regions,
175 vec![(from_peer.id, ctx.persistent_ctx.region_id)]
176 );
177
178 let table_route = table_metadata_manager
179 .table_route_manager()
180 .table_route_storage()
181 .get(table_id)
182 .await
183 .unwrap()
184 .unwrap();
185 assert_eq!(
186 &expected_region_routes,
187 table_route.region_routes().unwrap()
188 );
189 }
190
191 #[tokio::test]
192 async fn test_next_migration_end_state() {
193 let mut state = Box::new(UpdateMetadata::Rollback);
194 let persistent_context = new_persistent_context();
195 let from_peer = persistent_context.from_peer.clone();
196
197 let env = TestingEnv::new();
198 let mut ctx = env.context_factory().new_context(persistent_context);
199 let table_id = ctx.region_id().table_id();
200
201 let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
202 let region_routes = vec![
203 RegionRoute {
204 region: Region::new_test(RegionId::new(1024, 1)),
205 leader_peer: Some(from_peer.clone()),
206 leader_state: Some(LeaderState::Downgrading),
207 ..Default::default()
208 },
209 RegionRoute {
210 region: Region::new_test(RegionId::new(1024, 2)),
211 leader_peer: Some(Peer::empty(4)),
212 leader_state: Some(LeaderState::Downgrading),
213 ..Default::default()
214 },
215 RegionRoute {
216 region: Region::new_test(RegionId::new(1024, 3)),
217 leader_peer: Some(Peer::empty(5)),
218 ..Default::default()
219 },
220 ];
221
222 let expected_region_routes = {
223 let mut region_routes = region_routes.clone();
224 region_routes[0].leader_state = None;
225 region_routes
226 };
227
228 env.create_physical_table_metadata(table_info, region_routes)
229 .await;
230
231 let table_metadata_manager = env.table_metadata_manager();
232
233 let procedure_ctx = new_procedure_context();
234 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
235
236 let _ = next
237 .as_any()
238 .downcast_ref::<RegionMigrationAbort>()
239 .unwrap();
240
241 assert!(ctx.volatile_ctx.table_route.is_none());
242
243 let table_route = table_metadata_manager
244 .table_route_manager()
245 .table_route_storage()
246 .get(table_id)
247 .await
248 .unwrap()
249 .unwrap();
250 assert_eq!(
251 &expected_region_routes,
252 table_route.region_routes().unwrap()
253 );
254 }
255}