meta_srv/procedure/region_migration/update_metadata/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::lock_key::TableLock;
17use common_meta::rpc::router::LeaderState;
18use common_procedure::ContextProviderRef;
19use common_telemetry::{error, info};
20use snafu::ResultExt;
21
22use crate::error::{self, Result};
23use crate::procedure::region_migration::Context;
24use crate::procedure::region_migration::update_metadata::UpdateMetadata;
25
26impl UpdateMetadata {
27    /// Downgrades the leader region.
28    ///
29    /// Abort(non-retry):
30    /// - TableRoute is not found.
31    ///
32    /// Retry:
33    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
34    /// - Failed to retrieve the metadata of table.
35    ///
36    /// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue):
37    ///
38    /// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure)
39    ///   that is executed concurrently for **other region**.
40    ///   It will only update **other region** info. Therefore, It's safe to retry after failure.
41    ///
42    /// - There is no other DDL procedure executed concurrently for the current table.
43    pub async fn downgrade_leader_region(
44        &self,
45        ctx: &mut Context,
46        ctx_provider: &ContextProviderRef,
47    ) -> Result<()> {
48        let table_metadata_manager = ctx.table_metadata_manager.clone();
49        let from_peer_id = ctx.persistent_ctx.from_peer.id;
50        let table_regions = ctx.persistent_ctx.table_regions();
51
52        for (table_id, regions) in table_regions {
53            let table_lock = TableLock::Write(table_id).into();
54            let _guard = ctx_provider.acquire_lock(&table_lock).await;
55
56            let current_table_route_value = ctx.get_table_route_value(table_id).await?;
57            if let Err(err) = table_metadata_manager
58                .update_leader_region_status(table_id, &current_table_route_value, |route| {
59                    if regions.contains(&route.region.id)
60                        && route
61                            .leader_peer
62                            .as_ref()
63                            .is_some_and(|leader_peer| leader_peer.id == from_peer_id)
64                    {
65                        Some(Some(LeaderState::Downgrading))
66                    } else {
67                        None
68                    }
69                })
70                .await
71                .context(error::TableMetadataManagerSnafu)
72            {
73                error!(err; "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}");
74                return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
75                    reason: format!(
76                        "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}"
77                    ),
78                });
79            }
80            info!(
81                "Downgrading leader region table route success, table_id: {table_id}, regions: {regions:?}, from_peer_id: {from_peer_id}"
82            );
83        }
84
85        Ok(())
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use std::assert_matches::assert_matches;
92    use std::collections::HashMap;
93    use std::sync::Arc;
94
95    use common_meta::key::test_utils::new_test_table_info;
96    use common_meta::peer::Peer;
97    use common_meta::rpc::router::{Region, RegionRoute};
98    use common_procedure_test::MockContextProvider;
99    use store_api::storage::RegionId;
100
101    use crate::error::Error;
102    use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
103    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
104    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
105    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
106
107    fn new_persistent_context() -> PersistentContext {
108        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
109    }
110
111    #[test]
112    fn test_state_serialization() {
113        let state = UpdateMetadata::Downgrade;
114        let expected = r#"{"UpdateMetadata":"Downgrade"}"#;
115        assert_eq!(expected, serde_json::to_string(&state).unwrap());
116    }
117
118    #[tokio::test]
119    async fn test_table_route_is_not_found_error() {
120        let state = UpdateMetadata::Downgrade;
121        let env = TestingEnv::new();
122        let persistent_context = new_persistent_context();
123        let mut ctx = env.context_factory().new_context(persistent_context);
124        let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
125
126        let err = state
127            .downgrade_leader_region(&mut ctx, &provider)
128            .await
129            .unwrap_err();
130
131        assert_matches!(err, Error::TableRouteNotFound { .. });
132
133        assert!(!err.is_retryable());
134    }
135
136    #[tokio::test]
137    async fn test_only_downgrade_from_peer() {
138        let mut state = Box::new(UpdateMetadata::Downgrade);
139        let persistent_context = new_persistent_context();
140
141        let env = TestingEnv::new();
142        let mut ctx = env.context_factory().new_context(persistent_context);
143        let table_id = ctx.persistent_ctx.region_ids[0].table_id();
144
145        let table_info = new_test_table_info(1024, vec![1, 2]).into();
146        let region_routes = vec![RegionRoute {
147            region: Region::new_test(RegionId::new(1024, 1)),
148            leader_peer: Some(Peer::empty(1024)),
149            ..Default::default()
150        }];
151
152        env.create_physical_table_metadata(table_info, region_routes)
153            .await;
154
155        let table_metadata_manager = env.table_metadata_manager();
156
157        let procedure_ctx = new_procedure_context();
158        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
159
160        let _ = next
161            .as_any()
162            .downcast_ref::<DowngradeLeaderRegion>()
163            .unwrap();
164
165        let latest_table_route = table_metadata_manager
166            .table_route_manager()
167            .table_route_storage()
168            .get(table_id)
169            .await
170            .unwrap()
171            .unwrap();
172
173        // It should remain unchanged.
174        assert_eq!(latest_table_route.version().unwrap(), 0);
175        assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
176    }
177
178    #[tokio::test]
179    async fn test_next_downgrade_leader_region_state() {
180        let mut state = Box::new(UpdateMetadata::Downgrade);
181        let persistent_context = new_persistent_context();
182        let from_peer = persistent_context.from_peer.clone();
183
184        let env = TestingEnv::new();
185        let mut ctx = env.context_factory().new_context(persistent_context);
186        let table_id = ctx.persistent_ctx.region_ids[0].table_id();
187
188        let table_info = new_test_table_info(1024, vec![1, 2]).into();
189        let region_routes = vec![RegionRoute {
190            region: Region::new_test(RegionId::new(1024, 1)),
191            leader_peer: Some(from_peer.clone()),
192            ..Default::default()
193        }];
194
195        env.create_physical_table_metadata(table_info, region_routes)
196            .await;
197
198        let table_metadata_manager = env.table_metadata_manager();
199
200        let procedure_ctx = new_procedure_context();
201        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
202
203        let _ = next
204            .as_any()
205            .downcast_ref::<DowngradeLeaderRegion>()
206            .unwrap();
207
208        let latest_table_route = table_metadata_manager
209            .table_route_manager()
210            .table_route_storage()
211            .get(table_id)
212            .await
213            .unwrap()
214            .unwrap();
215
216        assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading());
217    }
218}