meta_srv/procedure/region_migration/
migration_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, Result};
26use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
27use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
28use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
29use crate::procedure::region_migration::update_metadata::UpdateMetadata;
30use crate::procedure::region_migration::{Context, State};
31
32/// The behaviors:
33///
34/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
35///
36/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
37///
38/// Otherwise go to the [OpenCandidateRegion] state.
39#[derive(Debug, Serialize, Deserialize)]
40pub struct RegionMigrationStart;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for RegionMigrationStart {
45    /// Yields next [State].
46    ///
47    /// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
48    ///
49    /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
50    ///
51    /// Otherwise go to the [OpenCandidateRegion] state.
52    async fn next(
53        &mut self,
54        ctx: &mut Context,
55        _procedure_ctx: &ProcedureContext,
56    ) -> Result<(Box<dyn State>, Status)> {
57        let region_id = ctx.persistent_ctx.region_id;
58        let region_route = self.retrieve_region_route(ctx, region_id).await?;
59        let to_peer = &ctx.persistent_ctx.to_peer;
60        let from_peer = &ctx.persistent_ctx.from_peer;
61
62        if self.has_migrated(&region_route, to_peer)? {
63            info!(
64                "Region has been migrated, region: {:?}, to_peer: {:?}",
65                region_route.region.id, to_peer
66            );
67            Ok((Box::new(RegionMigrationEnd), Status::done()))
68        } else if self.invalid_leader_peer(&region_route, from_peer)? {
69            info!(
70                "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
71                region_route.region.id, region_route.leader_peer, from_peer,
72            );
73            Ok((
74                Box::new(RegionMigrationAbort::new(&format!(
75                    "Invalid region leader peer: {from_peer:?}, expected: {:?}",
76                    region_route.leader_peer.as_ref().unwrap(),
77                ))),
78                Status::done(),
79            ))
80        } else if self.check_candidate_region_on_peer(&region_route, to_peer) {
81            Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
82        } else {
83            Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
84        }
85    }
86
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90}
91
92impl RegionMigrationStart {
93    /// Retrieves region route.
94    ///
95    /// Abort(non-retry):
96    /// - TableRoute is not found.
97    /// - RegionRoute is not found.
98    ///
99    /// Retry:
100    /// - Failed to retrieve the metadata of table.
101    async fn retrieve_region_route(
102        &self,
103        ctx: &mut Context,
104        region_id: RegionId,
105    ) -> Result<RegionRoute> {
106        let table_id = region_id.table_id();
107        let table_route = ctx.get_table_route_value().await?;
108
109        let region_route = table_route
110            .region_routes()
111            .context(error::UnexpectedLogicalRouteTableSnafu {
112                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113            })?
114            .iter()
115            .find(|route| route.region.id == region_id)
116            .cloned()
117            .context(error::UnexpectedSnafu {
118                violated: format!(
119                    "RegionRoute({}) is not found in TableRoute({})",
120                    region_id, table_id
121                ),
122            })?;
123
124        Ok(region_route)
125    }
126
127    /// Checks whether the candidate region on region has been opened.
128    /// Returns true if it's been opened.
129    fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
130        region_route
131            .follower_peers
132            .iter()
133            .any(|peer| peer.id == to_peer.id)
134    }
135
136    /// Returns true if the region leader is not the `from_peer`.
137    ///     
138    /// Abort(non-retry):
139    /// - Leader peer of RegionRoute is not found.
140    fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
141        let region_id = region_route.region.id;
142
143        let is_invalid_leader_peer = region_route
144            .leader_peer
145            .as_ref()
146            .context(error::UnexpectedSnafu {
147                violated: format!("Leader peer is not found in TableRoute({})", region_id),
148            })?
149            .id
150            != from_peer.id;
151        Ok(is_invalid_leader_peer)
152    }
153
154    /// Checks whether the region has been migrated.
155    /// Returns true if it's.
156    ///     
157    /// Abort(non-retry):
158    /// - Leader peer of RegionRoute is not found.
159    fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
160        let region_id = region_route.region.id;
161
162        let region_migrated = region_route
163            .leader_peer
164            .as_ref()
165            .context(error::UnexpectedSnafu {
166                violated: format!("Leader peer is not found in TableRoute({})", region_id),
167            })?
168            .id
169            == to_peer.id;
170        Ok(region_migrated)
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::assert_matches::assert_matches;
177
178    use common_meta::key::test_utils::new_test_table_info;
179    use common_meta::peer::Peer;
180    use common_meta::rpc::router::{Region, RegionRoute};
181    use store_api::storage::RegionId;
182
183    use super::*;
184    use crate::error::Error;
185    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
186    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
187    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
188
189    fn new_persistent_context() -> PersistentContext {
190        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
191    }
192
193    #[tokio::test]
194    async fn test_table_route_is_not_found_error() {
195        let state = RegionMigrationStart;
196        let env = TestingEnv::new();
197        let persistent_context = new_persistent_context();
198        let mut ctx = env.context_factory().new_context(persistent_context);
199
200        let err = state
201            .retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
202            .await
203            .unwrap_err();
204
205        assert_matches!(err, Error::TableRouteNotFound { .. });
206
207        assert!(!err.is_retryable());
208    }
209
210    #[tokio::test]
211    async fn test_region_route_is_not_found_error() {
212        let state = RegionMigrationStart;
213        let persistent_context = new_persistent_context();
214        let from_peer = persistent_context.from_peer.clone();
215
216        let env = TestingEnv::new();
217        let mut ctx = env.context_factory().new_context(persistent_context);
218
219        let table_info = new_test_table_info(1024, vec![1]).into();
220        let region_route = RegionRoute {
221            region: Region::new_test(RegionId::new(1024, 1)),
222            leader_peer: Some(from_peer.clone()),
223            ..Default::default()
224        };
225
226        env.create_physical_table_metadata(table_info, vec![region_route])
227            .await;
228
229        let err = state
230            .retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
231            .await
232            .unwrap_err();
233
234        assert_matches!(err, Error::Unexpected { .. });
235        assert!(!err.is_retryable());
236    }
237
238    #[tokio::test]
239    async fn test_next_update_metadata_downgrade_state() {
240        let mut state = Box::new(RegionMigrationStart);
241        // from_peer: 1
242        // to_peer: 2
243        let persistent_context = new_persistent_context();
244        let from_peer_id = persistent_context.from_peer.id;
245        let to_peer = persistent_context.to_peer.clone();
246        let region_id = persistent_context.region_id;
247
248        let env = TestingEnv::new();
249        let mut ctx = env.context_factory().new_context(persistent_context);
250
251        let table_info = new_test_table_info(1024, vec![1]).into();
252        let region_routes = vec![RegionRoute {
253            region: Region::new_test(region_id),
254            leader_peer: Some(Peer::empty(from_peer_id)),
255            follower_peers: vec![to_peer],
256            ..Default::default()
257        }];
258
259        env.create_physical_table_metadata(table_info, region_routes)
260            .await;
261        let procedure_ctx = new_procedure_context();
262        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
263
264        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
265
266        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
267    }
268
269    #[tokio::test]
270    async fn test_next_migration_end_state() {
271        let mut state = Box::new(RegionMigrationStart);
272        // from_peer: 1
273        // to_peer: 2
274        let persistent_context = new_persistent_context();
275        let to_peer = persistent_context.to_peer.clone();
276        let from_peer = persistent_context.from_peer.clone();
277        let region_id = persistent_context.region_id;
278
279        let env = TestingEnv::new();
280        let mut ctx = env.context_factory().new_context(persistent_context);
281
282        let table_info = new_test_table_info(1024, vec![1]).into();
283        let region_routes = vec![RegionRoute {
284            region: Region::new_test(region_id),
285            leader_peer: Some(to_peer),
286            follower_peers: vec![from_peer],
287            ..Default::default()
288        }];
289
290        env.create_physical_table_metadata(table_info, region_routes)
291            .await;
292        let procedure_ctx = new_procedure_context();
293        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
294
295        let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
296    }
297
298    #[tokio::test]
299    async fn test_next_open_candidate_region_state() {
300        let mut state = Box::new(RegionMigrationStart);
301        // from_peer: 1
302        // to_peer: 2
303        let persistent_context = new_persistent_context();
304        let from_peer_id = persistent_context.from_peer.id;
305        let region_id = persistent_context.region_id;
306        let env = TestingEnv::new();
307        let mut ctx = env.context_factory().new_context(persistent_context);
308
309        let table_info = new_test_table_info(1024, vec![1]).into();
310        let region_routes = vec![RegionRoute {
311            region: Region::new_test(region_id),
312            leader_peer: Some(Peer::empty(from_peer_id)),
313            ..Default::default()
314        }];
315
316        env.create_physical_table_metadata(table_info, region_routes)
317            .await;
318        let procedure_ctx = new_procedure_context();
319        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
320
321        let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
322    }
323
324    #[tokio::test]
325    async fn test_next_migration_abort() {
326        let mut state = Box::new(RegionMigrationStart);
327        // from_peer: 1
328        // to_peer: 2
329        let persistent_context = new_persistent_context();
330        let region_id = persistent_context.region_id;
331        let env = TestingEnv::new();
332        let mut ctx = env.context_factory().new_context(persistent_context);
333
334        let table_info = new_test_table_info(1024, vec![1]).into();
335        let region_routes = vec![RegionRoute {
336            region: Region::new_test(region_id),
337            leader_peer: Some(Peer::empty(1024)),
338            ..Default::default()
339        }];
340
341        env.create_physical_table_metadata(table_info, region_routes)
342            .await;
343        let procedure_ctx = new_procedure_context();
344        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
345
346        let _ = next
347            .as_any()
348            .downcast_ref::<RegionMigrationAbort>()
349            .unwrap();
350    }
351}