meta_srv/procedure/region_migration/update_metadata/
downgrade_leader_region.rs1use common_error::ext::BoxedError;
16use common_meta::rpc::router::LeaderState;
17use snafu::ResultExt;
18
19use crate::error::{self, Result};
20use crate::procedure::region_migration::update_metadata::UpdateMetadata;
21use crate::procedure::region_migration::Context;
22
23impl UpdateMetadata {
24 pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
41 let table_metadata_manager = ctx.table_metadata_manager.clone();
42 let from_peer_id = ctx.persistent_ctx.from_peer.id;
43 let region_id = ctx.region_id();
44 let table_id = region_id.table_id();
45 let current_table_route_value = ctx.get_table_route_value().await?;
46
47 if let Err(err) = table_metadata_manager
49 .update_leader_region_status(table_id, current_table_route_value, |route| {
50 if route.region.id == region_id
51 && route
52 .leader_peer
53 .as_ref()
54 .is_some_and(|leader_peer| leader_peer.id == from_peer_id)
55 {
56 Some(Some(LeaderState::Downgrading))
57 } else {
58 None
59 }
60 })
61 .await
62 .context(error::TableMetadataManagerSnafu)
63 {
64 ctx.remove_table_route_value();
65 return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
66 reason: format!(
67 "Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
68 ),
69 });
70 }
71
72 ctx.remove_table_route_value();
73
74 Ok(())
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use std::assert_matches::assert_matches;
81
82 use common_meta::key::test_utils::new_test_table_info;
83 use common_meta::peer::Peer;
84 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
85 use store_api::storage::RegionId;
86
87 use crate::error::Error;
88 use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
89 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
90 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
91 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
92
93 fn new_persistent_context() -> PersistentContext {
94 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
95 }
96
97 #[test]
98 fn test_state_serialization() {
99 let state = UpdateMetadata::Downgrade;
100 let expected = r#"{"UpdateMetadata":"Downgrade"}"#;
101 assert_eq!(expected, serde_json::to_string(&state).unwrap());
102 }
103
104 #[tokio::test]
105 async fn test_table_route_is_not_found_error() {
106 let state = UpdateMetadata::Downgrade;
107 let env = TestingEnv::new();
108 let persistent_context = new_persistent_context();
109 let mut ctx = env.context_factory().new_context(persistent_context);
110
111 let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
112
113 assert_matches!(err, Error::TableRouteNotFound { .. });
114
115 assert!(!err.is_retryable());
116 }
117
118 #[tokio::test]
119 async fn test_failed_to_update_table_route_error() {
120 let state = UpdateMetadata::Downgrade;
121 let persistent_context = new_persistent_context();
122 let from_peer = persistent_context.from_peer.clone();
123
124 let env = TestingEnv::new();
125 let mut ctx = env.context_factory().new_context(persistent_context);
126 let table_id = ctx.region_id().table_id();
127
128 let table_info = new_test_table_info(1024, vec![1, 2]).into();
129 let region_routes = vec![
130 RegionRoute {
131 region: Region::new_test(RegionId::new(1024, 1)),
132 leader_peer: Some(from_peer.clone()),
133 ..Default::default()
134 },
135 RegionRoute {
136 region: Region::new_test(RegionId::new(1024, 2)),
137 leader_peer: Some(Peer::empty(4)),
138 ..Default::default()
139 },
140 ];
141
142 env.create_physical_table_metadata(table_info, region_routes)
143 .await;
144
145 let table_metadata_manager = env.table_metadata_manager();
146 let original_table_route = table_metadata_manager
147 .table_route_manager()
148 .table_route_storage()
149 .get_with_raw_bytes(table_id)
150 .await
151 .unwrap()
152 .unwrap();
153
154 table_metadata_manager
156 .update_leader_region_status(table_id, &original_table_route, |route| {
157 if route.region.id == RegionId::new(1024, 2) {
158 Some(Some(LeaderState::Downgrading))
159 } else {
160 None
161 }
162 })
163 .await
164 .unwrap();
165
166 ctx.volatile_ctx.table_route = Some(original_table_route);
168
169 let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
170 assert!(ctx.volatile_ctx.table_route.is_none());
171 assert!(err.is_retryable());
172 assert!(format!("{err:?}").contains("Failed to update the table route"));
173 }
174
175 #[tokio::test]
176 async fn test_only_downgrade_from_peer() {
177 let mut state = Box::new(UpdateMetadata::Downgrade);
178 let persistent_context = new_persistent_context();
179
180 let env = TestingEnv::new();
181 let mut ctx = env.context_factory().new_context(persistent_context);
182 let table_id = ctx.region_id().table_id();
183
184 let table_info = new_test_table_info(1024, vec![1, 2]).into();
185 let region_routes = vec![RegionRoute {
186 region: Region::new_test(RegionId::new(1024, 1)),
187 leader_peer: Some(Peer::empty(1024)),
188 ..Default::default()
189 }];
190
191 env.create_physical_table_metadata(table_info, region_routes)
192 .await;
193
194 let table_metadata_manager = env.table_metadata_manager();
195
196 let procedure_ctx = new_procedure_context();
197 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
198
199 let _ = next
200 .as_any()
201 .downcast_ref::<DowngradeLeaderRegion>()
202 .unwrap();
203
204 let latest_table_route = table_metadata_manager
205 .table_route_manager()
206 .table_route_storage()
207 .get(table_id)
208 .await
209 .unwrap()
210 .unwrap();
211
212 assert_eq!(latest_table_route.version().unwrap(), 0);
214 assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
215 assert!(ctx.volatile_ctx.table_route.is_none());
216 }
217
218 #[tokio::test]
219 async fn test_next_downgrade_leader_region_state() {
220 let mut state = Box::new(UpdateMetadata::Downgrade);
221 let persistent_context = new_persistent_context();
222 let from_peer = persistent_context.from_peer.clone();
223
224 let env = TestingEnv::new();
225 let mut ctx = env.context_factory().new_context(persistent_context);
226 let table_id = ctx.region_id().table_id();
227
228 let table_info = new_test_table_info(1024, vec![1, 2]).into();
229 let region_routes = vec![RegionRoute {
230 region: Region::new_test(RegionId::new(1024, 1)),
231 leader_peer: Some(from_peer.clone()),
232 ..Default::default()
233 }];
234
235 env.create_physical_table_metadata(table_info, region_routes)
236 .await;
237
238 let table_metadata_manager = env.table_metadata_manager();
239
240 let procedure_ctx = new_procedure_context();
241 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
242
243 let _ = next
244 .as_any()
245 .downcast_ref::<DowngradeLeaderRegion>()
246 .unwrap();
247
248 let latest_table_route = table_metadata_manager
249 .table_route_manager()
250 .table_route_storage()
251 .get(table_id)
252 .await
253 .unwrap()
254 .unwrap();
255
256 assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
257 assert!(ctx.volatile_ctx.table_route.is_none());
258 }
259}