meta_srv/procedure/region_migration/update_metadata/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::rpc::router::LeaderState;
17use snafu::ResultExt;
18
19use crate::error::{self, Result};
20use crate::procedure::region_migration::update_metadata::UpdateMetadata;
21use crate::procedure::region_migration::Context;
22
23impl UpdateMetadata {
24    /// Downgrades the leader region.
25    ///
26    /// Abort(non-retry):
27    /// - TableRoute is not found.
28    ///
29    /// Retry:
30    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
31    /// - Failed to retrieve the metadata of table.
32    ///
33    /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue):
34    ///
35    /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure)
36    ///   that is executed concurrently for **other region**.
37    ///   It will only update **other region** info. Therefore, It's safe to retry after failure.
38    ///
39    /// - There is no other DDL procedure executed concurrently for the current table.
40    pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
41        let table_metadata_manager = ctx.table_metadata_manager.clone();
42        let from_peer_id = ctx.persistent_ctx.from_peer.id;
43        let region_id = ctx.region_id();
44        let table_id = region_id.table_id();
45        let current_table_route_value = ctx.get_table_route_value().await?;
46
47        // TODO(weny): ensures the leader region peer is the `from_peer`.
48        if let Err(err) = table_metadata_manager
49            .update_leader_region_status(table_id, current_table_route_value, |route| {
50                if route.region.id == region_id
51                    && route
52                        .leader_peer
53                        .as_ref()
54                        .is_some_and(|leader_peer| leader_peer.id == from_peer_id)
55                {
56                    Some(Some(LeaderState::Downgrading))
57                } else {
58                    None
59                }
60            })
61            .await
62            .context(error::TableMetadataManagerSnafu)
63        {
64            ctx.remove_table_route_value();
65            return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
66                reason: format!(
67                    "Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
68                ),
69            });
70        }
71
72        ctx.remove_table_route_value();
73
74        Ok(())
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use std::assert_matches::assert_matches;
81
82    use common_meta::key::test_utils::new_test_table_info;
83    use common_meta::peer::Peer;
84    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
85    use store_api::storage::RegionId;
86
87    use crate::error::Error;
88    use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
89    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
90    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
91    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
92
93    fn new_persistent_context() -> PersistentContext {
94        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
95    }
96
97    #[test]
98    fn test_state_serialization() {
99        let state = UpdateMetadata::Downgrade;
100        let expected = r#"{"UpdateMetadata":"Downgrade"}"#;
101        assert_eq!(expected, serde_json::to_string(&state).unwrap());
102    }
103
104    #[tokio::test]
105    async fn test_table_route_is_not_found_error() {
106        let state = UpdateMetadata::Downgrade;
107        let env = TestingEnv::new();
108        let persistent_context = new_persistent_context();
109        let mut ctx = env.context_factory().new_context(persistent_context);
110
111        let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
112
113        assert_matches!(err, Error::TableRouteNotFound { .. });
114
115        assert!(!err.is_retryable());
116    }
117
118    #[tokio::test]
119    async fn test_failed_to_update_table_route_error() {
120        let state = UpdateMetadata::Downgrade;
121        let persistent_context = new_persistent_context();
122        let from_peer = persistent_context.from_peer.clone();
123
124        let env = TestingEnv::new();
125        let mut ctx = env.context_factory().new_context(persistent_context);
126        let table_id = ctx.region_id().table_id();
127
128        let table_info = new_test_table_info(1024, vec![1, 2]).into();
129        let region_routes = vec![
130            RegionRoute {
131                region: Region::new_test(RegionId::new(1024, 1)),
132                leader_peer: Some(from_peer.clone()),
133                ..Default::default()
134            },
135            RegionRoute {
136                region: Region::new_test(RegionId::new(1024, 2)),
137                leader_peer: Some(Peer::empty(4)),
138                ..Default::default()
139            },
140        ];
141
142        env.create_physical_table_metadata(table_info, region_routes)
143            .await;
144
145        let table_metadata_manager = env.table_metadata_manager();
146        let original_table_route = table_metadata_manager
147            .table_route_manager()
148            .table_route_storage()
149            .get_with_raw_bytes(table_id)
150            .await
151            .unwrap()
152            .unwrap();
153
154        // modifies the table route.
155        table_metadata_manager
156            .update_leader_region_status(table_id, &original_table_route, |route| {
157                if route.region.id == RegionId::new(1024, 2) {
158                    Some(Some(LeaderState::Downgrading))
159                } else {
160                    None
161                }
162            })
163            .await
164            .unwrap();
165
166        // sets the old table route.
167        ctx.volatile_ctx.table_route = Some(original_table_route);
168
169        let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
170        assert!(ctx.volatile_ctx.table_route.is_none());
171        assert!(err.is_retryable());
172        assert!(format!("{err:?}").contains("Failed to update the table route"));
173    }
174
175    #[tokio::test]
176    async fn test_only_downgrade_from_peer() {
177        let mut state = Box::new(UpdateMetadata::Downgrade);
178        let persistent_context = new_persistent_context();
179
180        let env = TestingEnv::new();
181        let mut ctx = env.context_factory().new_context(persistent_context);
182        let table_id = ctx.region_id().table_id();
183
184        let table_info = new_test_table_info(1024, vec![1, 2]).into();
185        let region_routes = vec![RegionRoute {
186            region: Region::new_test(RegionId::new(1024, 1)),
187            leader_peer: Some(Peer::empty(1024)),
188            ..Default::default()
189        }];
190
191        env.create_physical_table_metadata(table_info, region_routes)
192            .await;
193
194        let table_metadata_manager = env.table_metadata_manager();
195
196        let procedure_ctx = new_procedure_context();
197        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
198
199        let _ = next
200            .as_any()
201            .downcast_ref::<DowngradeLeaderRegion>()
202            .unwrap();
203
204        let latest_table_route = table_metadata_manager
205            .table_route_manager()
206            .table_route_storage()
207            .get(table_id)
208            .await
209            .unwrap()
210            .unwrap();
211
212        // It should remain unchanged.
213        assert_eq!(latest_table_route.version().unwrap(), 0);
214        assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
215        assert!(ctx.volatile_ctx.table_route.is_none());
216    }
217
218    #[tokio::test]
219    async fn test_next_downgrade_leader_region_state() {
220        let mut state = Box::new(UpdateMetadata::Downgrade);
221        let persistent_context = new_persistent_context();
222        let from_peer = persistent_context.from_peer.clone();
223
224        let env = TestingEnv::new();
225        let mut ctx = env.context_factory().new_context(persistent_context);
226        let table_id = ctx.region_id().table_id();
227
228        let table_info = new_test_table_info(1024, vec![1, 2]).into();
229        let region_routes = vec![RegionRoute {
230            region: Region::new_test(RegionId::new(1024, 1)),
231            leader_peer: Some(from_peer.clone()),
232            ..Default::default()
233        }];
234
235        env.create_physical_table_metadata(table_info, region_routes)
236            .await;
237
238        let table_metadata_manager = env.table_metadata_manager();
239
240        let procedure_ctx = new_procedure_context();
241        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
242
243        let _ = next
244            .as_any()
245            .downcast_ref::<DowngradeLeaderRegion>()
246            .unwrap();
247
248        let latest_table_route = table_metadata_manager
249            .table_route_manager()
250            .table_route_storage()
251            .get(table_id)
252            .await
253            .unwrap()
254            .unwrap();
255
256        assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
257        assert!(ctx.volatile_ctx.table_route.is_none());
258    }
259}