meta_srv/procedure/region_migration/update_metadata/
downgrade_leader_region.rs1use common_error::ext::BoxedError;
16use common_meta::lock_key::TableLock;
17use common_meta::rpc::router::LeaderState;
18use common_procedure::ContextProviderRef;
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23use crate::procedure::region_migration::Context;
24use crate::procedure::region_migration::update_metadata::UpdateMetadata;
25
26impl UpdateMetadata {
27 pub async fn downgrade_leader_region(
44 &self,
45 ctx: &mut Context,
46 ctx_provider: &ContextProviderRef,
47 ) -> Result<()> {
48 let table_metadata_manager = ctx.table_metadata_manager.clone();
49 let from_peer_id = ctx.persistent_ctx.from_peer.id;
50 let table_regions = ctx.persistent_ctx.table_regions();
51
52 for (table_id, regions) in table_regions {
53 let table_lock = TableLock::Write(table_id).into();
54 let _guard = ctx_provider.acquire_lock(&table_lock).await;
55
56 let current_table_route_value = ctx.get_table_route_value(table_id).await?;
57 if let Err(err) = table_metadata_manager
58 .update_leader_region_status(table_id, ¤t_table_route_value, |route| {
59 if regions.contains(&route.region.id)
60 && route
61 .leader_peer
62 .as_ref()
63 .is_some_and(|leader_peer| leader_peer.id == from_peer_id)
64 {
65 Some(Some(LeaderState::Downgrading))
66 } else {
67 None
68 }
69 })
70 .await
71 .context(error::TableMetadataManagerSnafu)
72 {
73 error!(err; "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}");
74 return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
75 reason: format!(
76 "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}"
77 ),
78 });
79 }
80 info!(
81 "Downgrading leader region table route success, table_id: {table_id}, regions: {regions:?}, from_peer_id: {from_peer_id}"
82 );
83 }
84
85 Ok(())
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use std::assert_matches::assert_matches;
92 use std::collections::HashMap;
93 use std::sync::Arc;
94
95 use common_meta::key::test_utils::new_test_table_info;
96 use common_meta::peer::Peer;
97 use common_meta::rpc::router::{Region, RegionRoute};
98 use common_procedure_test::MockContextProvider;
99 use store_api::storage::RegionId;
100
101 use crate::error::Error;
102 use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
103 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
104 use crate::procedure::region_migration::update_metadata::UpdateMetadata;
105 use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
106
107 fn new_persistent_context() -> PersistentContext {
108 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
109 }
110
111 #[test]
112 fn test_state_serialization() {
113 let state = UpdateMetadata::Downgrade;
114 let expected = r#"{"UpdateMetadata":"Downgrade"}"#;
115 assert_eq!(expected, serde_json::to_string(&state).unwrap());
116 }
117
118 #[tokio::test]
119 async fn test_table_route_is_not_found_error() {
120 let state = UpdateMetadata::Downgrade;
121 let env = TestingEnv::new();
122 let persistent_context = new_persistent_context();
123 let mut ctx = env.context_factory().new_context(persistent_context);
124 let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
125
126 let err = state
127 .downgrade_leader_region(&mut ctx, &provider)
128 .await
129 .unwrap_err();
130
131 assert_matches!(err, Error::TableRouteNotFound { .. });
132
133 assert!(!err.is_retryable());
134 }
135
136 #[tokio::test]
137 async fn test_only_downgrade_from_peer() {
138 let mut state = Box::new(UpdateMetadata::Downgrade);
139 let persistent_context = new_persistent_context();
140
141 let env = TestingEnv::new();
142 let mut ctx = env.context_factory().new_context(persistent_context);
143 let table_id = ctx.persistent_ctx.region_ids[0].table_id();
144
145 let table_info = new_test_table_info(1024, vec![1, 2]).into();
146 let region_routes = vec![RegionRoute {
147 region: Region::new_test(RegionId::new(1024, 1)),
148 leader_peer: Some(Peer::empty(1024)),
149 ..Default::default()
150 }];
151
152 env.create_physical_table_metadata(table_info, region_routes)
153 .await;
154
155 let table_metadata_manager = env.table_metadata_manager();
156
157 let procedure_ctx = new_procedure_context();
158 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
159
160 let _ = next
161 .as_any()
162 .downcast_ref::<DowngradeLeaderRegion>()
163 .unwrap();
164
165 let latest_table_route = table_metadata_manager
166 .table_route_manager()
167 .table_route_storage()
168 .get(table_id)
169 .await
170 .unwrap()
171 .unwrap();
172
173 assert_eq!(latest_table_route.version().unwrap(), 0);
175 assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
176 }
177
178 #[tokio::test]
179 async fn test_next_downgrade_leader_region_state() {
180 let mut state = Box::new(UpdateMetadata::Downgrade);
181 let persistent_context = new_persistent_context();
182 let from_peer = persistent_context.from_peer.clone();
183
184 let env = TestingEnv::new();
185 let mut ctx = env.context_factory().new_context(persistent_context);
186 let table_id = ctx.persistent_ctx.region_ids[0].table_id();
187
188 let table_info = new_test_table_info(1024, vec![1, 2]).into();
189 let region_routes = vec![RegionRoute {
190 region: Region::new_test(RegionId::new(1024, 1)),
191 leader_peer: Some(from_peer.clone()),
192 ..Default::default()
193 }];
194
195 env.create_physical_table_metadata(table_info, region_routes)
196 .await;
197
198 let table_metadata_manager = env.table_metadata_manager();
199
200 let procedure_ctx = new_procedure_context();
201 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
202
203 let _ = next
204 .as_any()
205 .downcast_ref::<DowngradeLeaderRegion>()
206 .unwrap();
207
208 let latest_table_route = table_metadata_manager
209 .table_route_manager()
210 .table_route_storage()
211 .get(table_id)
212 .await
213 .unwrap()
214 .unwrap();
215
216 assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
217 }
218}