meta_srv/procedure/region_migration/update_metadata/
rollback_downgraded_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::lock_key::TableLock;
17use common_procedure::ContextProviderRef;
18use common_telemetry::{error, info};
19use snafu::ResultExt;
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::Context;
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24
25impl UpdateMetadata {
26    /// Rollbacks the downgraded leader region if the candidate region is unreachable.
27    ///
28    /// Abort(non-retry):
29    /// - TableRoute is not found.
30    ///
31    /// Retry:
32    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
33    /// - Failed to retrieve the metadata of table.
34    pub async fn rollback_downgraded_region(
35        &self,
36        ctx: &mut Context,
37        ctx_provider: &ContextProviderRef,
38    ) -> Result<()> {
39        let table_metadata_manager = ctx.table_metadata_manager.clone();
40        let table_regions = ctx.persistent_ctx.table_regions();
41
42        for (table_id, regions) in table_regions {
43            let table_lock = TableLock::Write(table_id).into();
44            let _guard = ctx_provider.acquire_lock(&table_lock).await;
45
46            let current_table_route_value = ctx.get_table_route_value(table_id).await?;
47            if let Err(err) = table_metadata_manager
48                .update_leader_region_status(table_id, &current_table_route_value, |route| {
49                    if regions.contains(&route.region.id) {
50                        Some(None)
51                    } else {
52                        None
53                    }
54                })
55                .await
56                .context(error::TableMetadataManagerSnafu)
57            {
58                error!(err; "Failed to update the table route during the rollback downgraded leader regions: {regions:?}");
59                return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
60                    reason: format!("Failed to update the table route during the rollback downgraded leader regions: {regions:?}"),
61                });
62            }
63            info!(
64                "Rolling back downgraded leader region table route success, table_id: {table_id}, regions: {regions:?}"
65            );
66        }
67        ctx.register_failure_detectors().await;
68
69        Ok(())
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use std::assert_matches::assert_matches;
76    use std::collections::HashMap;
77    use std::sync::Arc;
78
79    use common_meta::key::test_utils::new_test_table_info;
80    use common_meta::peer::Peer;
81    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
82    use common_procedure_test::MockContextProvider;
83    use store_api::storage::RegionId;
84
85    use crate::error::Error;
86    use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
87    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
88    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
89    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
90
91    fn new_persistent_context() -> PersistentContext {
92        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
93    }
94
95    #[tokio::test]
96    async fn test_table_route_is_not_found_error() {
97        let state = UpdateMetadata::Rollback;
98        let env = TestingEnv::new();
99        let persistent_context = new_persistent_context();
100        let mut ctx = env.context_factory().new_context(persistent_context);
101
102        let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
103        let err = state
104            .rollback_downgraded_region(&mut ctx, &provider)
105            .await
106            .unwrap_err();
107
108        assert_matches!(err, Error::TableRouteNotFound { .. });
109
110        assert!(!err.is_retryable());
111    }
112
113    #[tokio::test]
114    async fn test_next_migration_end_state() {
115        let mut state = Box::new(UpdateMetadata::Rollback);
116        let persistent_context = new_persistent_context();
117        let from_peer = persistent_context.from_peer.clone();
118
119        let env = TestingEnv::new();
120        let mut ctx = env.context_factory().new_context(persistent_context);
121        let table_id = ctx.persistent_ctx.region_ids[0].table_id();
122
123        let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
124        let region_routes = vec![
125            RegionRoute {
126                region: Region::new_test(RegionId::new(1024, 1)),
127                leader_peer: Some(from_peer.clone()),
128                leader_state: Some(LeaderState::Downgrading),
129                ..Default::default()
130            },
131            RegionRoute {
132                region: Region::new_test(RegionId::new(1024, 2)),
133                leader_peer: Some(Peer::empty(4)),
134                leader_state: Some(LeaderState::Downgrading),
135                ..Default::default()
136            },
137            RegionRoute {
138                region: Region::new_test(RegionId::new(1024, 3)),
139                leader_peer: Some(Peer::empty(5)),
140                ..Default::default()
141            },
142        ];
143
144        let expected_region_routes = {
145            let mut region_routes = region_routes.clone();
146            region_routes[0].leader_state = None;
147            region_routes
148        };
149
150        env.create_physical_table_metadata(table_info, region_routes)
151            .await;
152
153        let table_metadata_manager = env.table_metadata_manager();
154
155        let procedure_ctx = new_procedure_context();
156        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
157
158        let _ = next
159            .as_any()
160            .downcast_ref::<RegionMigrationAbort>()
161            .unwrap();
162
163        let table_route = table_metadata_manager
164            .table_route_manager()
165            .table_route_storage()
166            .get(table_id)
167            .await
168            .unwrap()
169            .unwrap();
170        assert_eq!(
171            &expected_region_routes,
172            table_route.region_routes().unwrap()
173        );
174    }
175}