meta_srv/procedure/region_migration/
migration_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::storage::RegionId;
24
25use crate::error::{self, Result};
26use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
27use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
28use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
29use crate::procedure::region_migration::update_metadata::UpdateMetadata;
30use crate::procedure::region_migration::{Context, State};
31
32/// The behaviors:
33///
34/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
35///
36/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
37///
38/// Otherwise go to the [OpenCandidateRegion] state.
39#[derive(Debug, Serialize, Deserialize)]
40pub struct RegionMigrationStart;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for RegionMigrationStart {
45    /// Yields next [State].
46    ///
47    /// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
48    ///
49    /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
50    ///
51    /// Otherwise go to the [OpenCandidateRegion] state.
52    async fn next(
53        &mut self,
54        ctx: &mut Context,
55        _procedure_ctx: &ProcedureContext,
56    ) -> Result<(Box<dyn State>, Status)> {
57        let region_id = ctx.persistent_ctx.region_id;
58        let region_route = self.retrieve_region_route(ctx, region_id).await?;
59        let to_peer = &ctx.persistent_ctx.to_peer;
60        let from_peer = &ctx.persistent_ctx.from_peer;
61
62        if self.has_migrated(&region_route, to_peer)? {
63            info!(
64                "Region has been migrated, region: {:?}, to_peer: {:?}",
65                region_route.region.id, to_peer
66            );
67            Ok((Box::new(RegionMigrationEnd), Status::done()))
68        } else if self.invalid_leader_peer(&region_route, from_peer)? {
69            info!(
70                "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
71                region_route.region.id, region_route.leader_peer, from_peer,
72            );
73            Ok((
74                Box::new(RegionMigrationAbort::new(&format!(
75                    "Invalid region leader peer: {from_peer:?}, expected: {:?}",
76                    region_route.leader_peer.as_ref().unwrap(),
77                ))),
78                Status::done(),
79            ))
80        } else if self.check_candidate_region_on_peer(&region_route, to_peer) {
81            Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
82        } else {
83            Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
84        }
85    }
86
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90}
91
92impl RegionMigrationStart {
93    /// Retrieves region route.
94    ///
95    /// Abort(non-retry):
96    /// - TableRoute is not found.
97    /// - RegionRoute is not found.
98    ///
99    /// Retry:
100    /// - Failed to retrieve the metadata of table.
101    async fn retrieve_region_route(
102        &self,
103        ctx: &mut Context,
104        region_id: RegionId,
105    ) -> Result<RegionRoute> {
106        let table_id = region_id.table_id();
107        let table_route = ctx.get_table_route_value().await?;
108
109        let region_route = table_route
110            .region_routes()
111            .context(error::UnexpectedLogicalRouteTableSnafu {
112                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
113            })?
114            .iter()
115            .find(|route| route.region.id == region_id)
116            .cloned()
117            .context(error::UnexpectedSnafu {
118                violated: format!(
119                    "RegionRoute({}) is not found in TableRoute({})",
120                    region_id, table_id
121                ),
122            })?;
123
124        Ok(region_route)
125    }
126
127    /// Checks whether the candidate region on region has been opened.
128    /// Returns true if it's been opened.
129    fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
130        let region_opened = region_route
131            .follower_peers
132            .iter()
133            .any(|peer| peer.id == to_peer.id);
134
135        region_opened
136    }
137
138    /// Returns true if the region leader is not the `from_peer`.
139    ///     
140    /// Abort(non-retry):
141    /// - Leader peer of RegionRoute is not found.
142    fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
143        let region_id = region_route.region.id;
144
145        let is_invalid_leader_peer = region_route
146            .leader_peer
147            .as_ref()
148            .context(error::UnexpectedSnafu {
149                violated: format!("Leader peer is not found in TableRoute({})", region_id),
150            })?
151            .id
152            != from_peer.id;
153        Ok(is_invalid_leader_peer)
154    }
155
156    /// Checks whether the region has been migrated.
157    /// Returns true if it's.
158    ///     
159    /// Abort(non-retry):
160    /// - Leader peer of RegionRoute is not found.
161    fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
162        let region_id = region_route.region.id;
163
164        let region_migrated = region_route
165            .leader_peer
166            .as_ref()
167            .context(error::UnexpectedSnafu {
168                violated: format!("Leader peer is not found in TableRoute({})", region_id),
169            })?
170            .id
171            == to_peer.id;
172        Ok(region_migrated)
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::assert_matches::assert_matches;
179
180    use common_meta::key::test_utils::new_test_table_info;
181    use common_meta::peer::Peer;
182    use common_meta::rpc::router::{Region, RegionRoute};
183    use store_api::storage::RegionId;
184
185    use super::*;
186    use crate::error::Error;
187    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
188    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
189    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
190
191    fn new_persistent_context() -> PersistentContext {
192        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
193    }
194
195    #[tokio::test]
196    async fn test_table_route_is_not_found_error() {
197        let state = RegionMigrationStart;
198        let env = TestingEnv::new();
199        let persistent_context = new_persistent_context();
200        let mut ctx = env.context_factory().new_context(persistent_context);
201
202        let err = state
203            .retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
204            .await
205            .unwrap_err();
206
207        assert_matches!(err, Error::TableRouteNotFound { .. });
208
209        assert!(!err.is_retryable());
210    }
211
212    #[tokio::test]
213    async fn test_region_route_is_not_found_error() {
214        let state = RegionMigrationStart;
215        let persistent_context = new_persistent_context();
216        let from_peer = persistent_context.from_peer.clone();
217
218        let env = TestingEnv::new();
219        let mut ctx = env.context_factory().new_context(persistent_context);
220
221        let table_info = new_test_table_info(1024, vec![1]).into();
222        let region_route = RegionRoute {
223            region: Region::new_test(RegionId::new(1024, 1)),
224            leader_peer: Some(from_peer.clone()),
225            ..Default::default()
226        };
227
228        env.create_physical_table_metadata(table_info, vec![region_route])
229            .await;
230
231        let err = state
232            .retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
233            .await
234            .unwrap_err();
235
236        assert_matches!(err, Error::Unexpected { .. });
237        assert!(!err.is_retryable());
238    }
239
240    #[tokio::test]
241    async fn test_next_update_metadata_downgrade_state() {
242        let mut state = Box::new(RegionMigrationStart);
243        // from_peer: 1
244        // to_peer: 2
245        let persistent_context = new_persistent_context();
246        let from_peer_id = persistent_context.from_peer.id;
247        let to_peer = persistent_context.to_peer.clone();
248        let region_id = persistent_context.region_id;
249
250        let env = TestingEnv::new();
251        let mut ctx = env.context_factory().new_context(persistent_context);
252
253        let table_info = new_test_table_info(1024, vec![1]).into();
254        let region_routes = vec![RegionRoute {
255            region: Region::new_test(region_id),
256            leader_peer: Some(Peer::empty(from_peer_id)),
257            follower_peers: vec![to_peer],
258            ..Default::default()
259        }];
260
261        env.create_physical_table_metadata(table_info, region_routes)
262            .await;
263        let procedure_ctx = new_procedure_context();
264        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
265
266        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
267
268        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
269    }
270
271    #[tokio::test]
272    async fn test_next_migration_end_state() {
273        let mut state = Box::new(RegionMigrationStart);
274        // from_peer: 1
275        // to_peer: 2
276        let persistent_context = new_persistent_context();
277        let to_peer = persistent_context.to_peer.clone();
278        let from_peer = persistent_context.from_peer.clone();
279        let region_id = persistent_context.region_id;
280
281        let env = TestingEnv::new();
282        let mut ctx = env.context_factory().new_context(persistent_context);
283
284        let table_info = new_test_table_info(1024, vec![1]).into();
285        let region_routes = vec![RegionRoute {
286            region: Region::new_test(region_id),
287            leader_peer: Some(to_peer),
288            follower_peers: vec![from_peer],
289            ..Default::default()
290        }];
291
292        env.create_physical_table_metadata(table_info, region_routes)
293            .await;
294        let procedure_ctx = new_procedure_context();
295        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
296
297        let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
298    }
299
300    #[tokio::test]
301    async fn test_next_open_candidate_region_state() {
302        let mut state = Box::new(RegionMigrationStart);
303        // from_peer: 1
304        // to_peer: 2
305        let persistent_context = new_persistent_context();
306        let from_peer_id = persistent_context.from_peer.id;
307        let region_id = persistent_context.region_id;
308        let env = TestingEnv::new();
309        let mut ctx = env.context_factory().new_context(persistent_context);
310
311        let table_info = new_test_table_info(1024, vec![1]).into();
312        let region_routes = vec![RegionRoute {
313            region: Region::new_test(region_id),
314            leader_peer: Some(Peer::empty(from_peer_id)),
315            ..Default::default()
316        }];
317
318        env.create_physical_table_metadata(table_info, region_routes)
319            .await;
320        let procedure_ctx = new_procedure_context();
321        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
322
323        let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
324    }
325
326    #[tokio::test]
327    async fn test_next_migration_abort() {
328        let mut state = Box::new(RegionMigrationStart);
329        // from_peer: 1
330        // to_peer: 2
331        let persistent_context = new_persistent_context();
332        let region_id = persistent_context.region_id;
333        let env = TestingEnv::new();
334        let mut ctx = env.context_factory().new_context(persistent_context);
335
336        let table_info = new_test_table_info(1024, vec![1]).into();
337        let region_routes = vec![RegionRoute {
338            region: Region::new_test(region_id),
339            leader_peer: Some(Peer::empty(1024)),
340            ..Default::default()
341        }];
342
343        env.create_physical_table_metadata(table_info, region_routes)
344            .await;
345        let procedure_ctx = new_procedure_context();
346        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
347
348        let _ = next
349            .as_any()
350            .downcast_ref::<RegionMigrationAbort>()
351            .unwrap();
352    }
353}