meta_srv/procedure/region_migration/update_metadata/
rollback_downgraded_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use snafu::ResultExt;
17
18use crate::error::{self, Result};
19use crate::procedure::region_migration::update_metadata::UpdateMetadata;
20use crate::procedure::region_migration::Context;
21
22impl UpdateMetadata {
23    /// Rollbacks the downgraded leader region if the candidate region is unreachable.
24    ///
25    /// Abort(non-retry):
26    /// - TableRoute is not found.
27    ///
28    /// Retry:
29    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
30    /// - Failed to retrieve the metadata of table.
31    pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> {
32        let table_metadata_manager = ctx.table_metadata_manager.clone();
33        let region_id = ctx.region_id();
34        let table_id = region_id.table_id();
35        let current_table_route_value = ctx.get_table_route_value().await?;
36
37        if let Err(err) = table_metadata_manager
38            .update_leader_region_status(table_id, current_table_route_value, |route| {
39                if route.region.id == region_id {
40                    Some(None)
41                } else {
42                    None
43                }
44            })
45            .await
46            .context(error::TableMetadataManagerSnafu)
47        {
48            ctx.remove_table_route_value();
49            return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
50                reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
51            });
52        }
53
54        ctx.register_failure_detectors().await;
55        ctx.remove_table_route_value();
56
57        Ok(())
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use std::assert_matches::assert_matches;
64    use std::sync::Arc;
65
66    use common_meta::key::test_utils::new_test_table_info;
67    use common_meta::peer::Peer;
68    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
69    use store_api::storage::RegionId;
70
71    use crate::error::Error;
72    use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
73    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
74    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
75    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
76    use crate::region::supervisor::RegionFailureDetectorControl;
77
78    fn new_persistent_context() -> PersistentContext {
79        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
80    }
81
82    #[tokio::test]
83    async fn test_table_route_is_not_found_error() {
84        let state = UpdateMetadata::Rollback;
85        let env = TestingEnv::new();
86        let persistent_context = new_persistent_context();
87        let mut ctx = env.context_factory().new_context(persistent_context);
88
89        let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
90
91        assert_matches!(err, Error::TableRouteNotFound { .. });
92
93        assert!(!err.is_retryable());
94    }
95
96    #[tokio::test]
97    async fn test_update_table_route_with_retry() {
98        let state = UpdateMetadata::Rollback;
99        let persistent_context = new_persistent_context();
100        let from_peer = persistent_context.from_peer.clone();
101
102        let env = TestingEnv::new();
103        let mut ctx = env.context_factory().new_context(persistent_context);
104        let (tx, mut rx) = tokio::sync::mpsc::channel(8);
105        ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
106        let table_id = ctx.region_id().table_id();
107
108        let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
109        let region_routes = vec![
110            RegionRoute {
111                region: Region::new_test(RegionId::new(1024, 1)),
112                leader_peer: Some(from_peer.clone()),
113                leader_state: Some(LeaderState::Downgrading),
114                ..Default::default()
115            },
116            RegionRoute {
117                region: Region::new_test(RegionId::new(1024, 2)),
118                leader_peer: Some(Peer::empty(4)),
119                leader_state: Some(LeaderState::Downgrading),
120                ..Default::default()
121            },
122            RegionRoute {
123                region: Region::new_test(RegionId::new(1024, 3)),
124                leader_peer: Some(Peer::empty(5)),
125                ..Default::default()
126            },
127        ];
128
129        let expected_region_routes = {
130            let mut region_routes = region_routes.clone();
131            region_routes[0].leader_state = None;
132            region_routes[1].leader_state = None;
133            region_routes
134        };
135
136        env.create_physical_table_metadata(table_info, region_routes)
137            .await;
138
139        let table_metadata_manager = env.table_metadata_manager();
140        let old_table_route = table_metadata_manager
141            .table_route_manager()
142            .table_route_storage()
143            .get_with_raw_bytes(table_id)
144            .await
145            .unwrap()
146            .unwrap();
147
148        // modifies the table route.
149        table_metadata_manager
150            .update_leader_region_status(table_id, &old_table_route, |route| {
151                if route.region.id == RegionId::new(1024, 2) {
152                    Some(None)
153                } else {
154                    None
155                }
156            })
157            .await
158            .unwrap();
159
160        ctx.volatile_ctx.table_route = Some(old_table_route);
161
162        let err = state
163            .rollback_downgraded_region(&mut ctx)
164            .await
165            .unwrap_err();
166        assert!(ctx.volatile_ctx.table_route.is_none());
167        assert!(err.is_retryable());
168        assert!(format!("{err:?}").contains("Failed to update the table route"));
169        assert_eq!(rx.len(), 0);
170        state.rollback_downgraded_region(&mut ctx).await.unwrap();
171        let event = rx.try_recv().unwrap();
172        let detecting_regions = event.into_region_failure_detectors();
173        assert_eq!(
174            detecting_regions,
175            vec![(from_peer.id, ctx.persistent_ctx.region_id)]
176        );
177
178        let table_route = table_metadata_manager
179            .table_route_manager()
180            .table_route_storage()
181            .get(table_id)
182            .await
183            .unwrap()
184            .unwrap();
185        assert_eq!(
186            &expected_region_routes,
187            table_route.region_routes().unwrap()
188        );
189    }
190
191    #[tokio::test]
192    async fn test_next_migration_end_state() {
193        let mut state = Box::new(UpdateMetadata::Rollback);
194        let persistent_context = new_persistent_context();
195        let from_peer = persistent_context.from_peer.clone();
196
197        let env = TestingEnv::new();
198        let mut ctx = env.context_factory().new_context(persistent_context);
199        let table_id = ctx.region_id().table_id();
200
201        let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
202        let region_routes = vec![
203            RegionRoute {
204                region: Region::new_test(RegionId::new(1024, 1)),
205                leader_peer: Some(from_peer.clone()),
206                leader_state: Some(LeaderState::Downgrading),
207                ..Default::default()
208            },
209            RegionRoute {
210                region: Region::new_test(RegionId::new(1024, 2)),
211                leader_peer: Some(Peer::empty(4)),
212                leader_state: Some(LeaderState::Downgrading),
213                ..Default::default()
214            },
215            RegionRoute {
216                region: Region::new_test(RegionId::new(1024, 3)),
217                leader_peer: Some(Peer::empty(5)),
218                ..Default::default()
219            },
220        ];
221
222        let expected_region_routes = {
223            let mut region_routes = region_routes.clone();
224            region_routes[0].leader_state = None;
225            region_routes
226        };
227
228        env.create_physical_table_metadata(table_info, region_routes)
229            .await;
230
231        let table_metadata_manager = env.table_metadata_manager();
232
233        let procedure_ctx = new_procedure_context();
234        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
235
236        let _ = next
237            .as_any()
238            .downcast_ref::<RegionMigrationAbort>()
239            .unwrap();
240
241        assert!(ctx.volatile_ctx.table_route.is_none());
242
243        let table_route = table_metadata_manager
244            .table_route_manager()
245            .table_route_storage()
246            .get(table_id)
247            .await
248            .unwrap()
249            .unwrap();
250        assert_eq!(
251            &expected_region_routes,
252            table_route.region_routes().unwrap()
253        );
254    }
255}