meta_srv/procedure/region_migration/
migration_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::peer::Peer;
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
26use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
27use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
28use crate::procedure::region_migration::{Context, State};
29
30/// The behaviors:
31///
32/// - If all regions have been migrated, transitions to [RegionMigrationEnd].
33/// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort].
34/// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region.
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RegionMigrationStart;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for RegionMigrationStart {
41    /// Yields next [State].
42    ///
43    /// Determines the next [State] for region migration:
44    ///
45    /// - If all regions have been migrated, transitions to [RegionMigrationEnd].
46    /// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort].
47    /// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region.
48    async fn next(
49        &mut self,
50        ctx: &mut Context,
51        _procedure_ctx: &ProcedureContext,
52    ) -> Result<(Box<dyn State>, Status)> {
53        let mut region_routes = self.retrieve_region_routes(ctx).await?;
54        let to_peer = &ctx.persistent_ctx.to_peer;
55        let from_peer = &ctx.persistent_ctx.from_peer;
56        let region_ids = &ctx.persistent_ctx.region_ids;
57
58        self.filter_unmigrated_regions(&mut region_routes, to_peer);
59
60        // No region to migrate, skip the migration.
61        if region_routes.is_empty() {
62            info!(
63                "All regions have been migrated, regions: {:?}, to_peer: {:?}",
64                region_ids, to_peer
65            );
66            return Ok((Box::new(RegionMigrationEnd), Status::done()));
67        }
68
69        // Updates the region ids to the unmigrated regions.
70        if region_routes.len() != region_ids.len() {
71            let unmigrated_region_ids = region_routes.iter().map(|route| route.region.id).collect();
72            info!(
73                "Some of the regions have been migrated, only migrate the following regions: {:?}, to_peer: {:?}",
74                unmigrated_region_ids, to_peer
75            );
76            ctx.persistent_ctx.region_ids = unmigrated_region_ids;
77        }
78
79        // Checks if any of the region leaders is not the `from_peer`.
80        for region_route in &region_routes {
81            if self.invalid_leader_peer(region_route, from_peer)? {
82                info!(
83                    "Abort region migration, region:{}, unexpected leader peer: {:?}, expected: {:?}",
84                    region_route.region.id, region_route.leader_peer, from_peer,
85                );
86                return Ok((
87                    Box::new(RegionMigrationAbort::new(&format!(
88                        "Invalid region leader peer: {:?}, expected: {:?}",
89                        region_route.leader_peer.as_ref().unwrap(),
90                        from_peer,
91                    ))),
92                    Status::done(),
93                ));
94            }
95        }
96
97        // If all checks pass, open the candidate region.
98        Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
99    }
100
101    fn as_any(&self) -> &dyn Any {
102        self
103    }
104}
105
106impl RegionMigrationStart {
107    /// Retrieves region routes for multiple regions.
108    ///
109    /// Abort(non-retry):
110    /// - TableRoute is not found.
111    /// - RegionRoute is not found.
112    ///
113    /// Retry:
114    /// - Failed to retrieve the metadata of table.
115    async fn retrieve_region_routes(&self, ctx: &mut Context) -> Result<Vec<RegionRoute>> {
116        let region_ids = &ctx.persistent_ctx.region_ids;
117        let table_route_values = ctx.get_table_route_values().await?;
118        let mut region_routes = Vec::with_capacity(region_ids.len());
119        for region_id in region_ids {
120            let table_id = region_id.table_id();
121            let region_route = table_route_values
122                .get(&table_id)
123                .context(error::TableRouteNotFoundSnafu { table_id })?
124                .region_routes()
125                .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
126                    err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
127                })?
128                .iter()
129                .find(|route| route.region.id == *region_id)
130                .cloned()
131                .with_context(|| error::UnexpectedSnafu {
132                    violated: format!(
133                        "RegionRoute({}) is not found in TableRoute({})",
134                        region_id, table_id
135                    ),
136                })?;
137            region_routes.push(region_route);
138        }
139
140        Ok(region_routes)
141    }
142
143    /// Returns true if the region leader is not the `from_peer`.
144    ///     
145    /// Abort(non-retry):
146    /// - Leader peer of RegionRoute is not found.
147    fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
148        let region_id = region_route.region.id;
149
150        let is_invalid_leader_peer = region_route
151            .leader_peer
152            .as_ref()
153            .with_context(|| error::UnexpectedSnafu {
154                violated: format!("Leader peer is not found in TableRoute({})", region_id),
155            })?
156            .id
157            != from_peer.id;
158        Ok(is_invalid_leader_peer)
159    }
160
161    /// Filters out regions that unmigrated.
162    fn filter_unmigrated_regions(&self, region_routes: &mut Vec<RegionRoute>, to_peer: &Peer) {
163        region_routes
164            .retain(|region_route| !self.has_migrated(region_route, to_peer).unwrap_or(false));
165    }
166
167    /// Checks whether the region has been migrated.
168    /// Returns true if it's.
169    ///     
170    /// Abort(non-retry):
171    /// - Leader peer of RegionRoute is not found.
172    fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
173        let region_id = region_route.region.id;
174
175        let region_migrated = region_route
176            .leader_peer
177            .as_ref()
178            .with_context(|| error::UnexpectedSnafu {
179                violated: format!("Leader peer is not found in TableRoute({})", region_id),
180            })?
181            .id
182            == to_peer.id;
183        Ok(region_migrated)
184    }
185}
186
187#[cfg(test)]
188mod tests {
189
190    use std::assert_matches::assert_matches;
191
192    use common_meta::key::test_utils::new_test_table_info;
193    use common_meta::peer::Peer;
194    use common_meta::rpc::router::{Region, RegionRoute};
195    use store_api::storage::RegionId;
196
197    use super::*;
198    use crate::error::Error;
199    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
200    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
201
202    fn new_persistent_context() -> PersistentContext {
203        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
204    }
205
206    #[tokio::test]
207    async fn test_table_route_is_not_found_error() {
208        let state = RegionMigrationStart;
209        let env = TestingEnv::new();
210        let persistent_context = new_persistent_context();
211        let mut ctx = env.context_factory().new_context(persistent_context);
212        let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
213        assert_matches!(err, Error::TableRouteNotFound { .. });
214        assert!(!err.is_retryable());
215    }
216
217    #[tokio::test]
218    async fn test_region_route_is_not_found_error() {
219        let state = RegionMigrationStart;
220        let persistent_context = new_persistent_context();
221        let from_peer = persistent_context.from_peer.clone();
222
223        let env = TestingEnv::new();
224        let mut ctx = env.context_factory().new_context(persistent_context);
225
226        let table_info = new_test_table_info(1024, vec![3]).into();
227        let region_route = RegionRoute {
228            region: Region::new_test(RegionId::new(1024, 3)),
229            leader_peer: Some(from_peer.clone()),
230            ..Default::default()
231        };
232
233        env.create_physical_table_metadata(table_info, vec![region_route])
234            .await;
235        let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
236        assert_matches!(err, Error::Unexpected { .. });
237        assert!(!err.is_retryable());
238    }
239
240    #[tokio::test]
241    async fn test_next_migration_end_state() {
242        let mut state = Box::new(RegionMigrationStart);
243        // from_peer: 1
244        // to_peer: 2
245        let persistent_context = new_persistent_context();
246        let to_peer = persistent_context.to_peer.clone();
247        let from_peer = persistent_context.from_peer.clone();
248        let region_id = persistent_context.region_ids[0];
249
250        let env = TestingEnv::new();
251        let mut ctx = env.context_factory().new_context(persistent_context);
252
253        let table_info = new_test_table_info(1024, vec![1]).into();
254        let region_routes = vec![RegionRoute {
255            region: Region::new_test(region_id),
256            leader_peer: Some(to_peer),
257            follower_peers: vec![from_peer],
258            ..Default::default()
259        }];
260
261        env.create_physical_table_metadata(table_info, region_routes)
262            .await;
263        let procedure_ctx = new_procedure_context();
264        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
265
266        let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
267    }
268
269    #[tokio::test]
270    async fn test_next_open_candidate_region_state() {
271        let mut state = Box::new(RegionMigrationStart);
272        // from_peer: 1
273        // to_peer: 2
274        let persistent_context = new_persistent_context();
275        let from_peer_id = persistent_context.from_peer.id;
276        let region_id = persistent_context.region_ids[0];
277        let env = TestingEnv::new();
278        let mut ctx = env.context_factory().new_context(persistent_context);
279
280        let table_info = new_test_table_info(1024, vec![1]).into();
281        let region_routes = vec![RegionRoute {
282            region: Region::new_test(region_id),
283            leader_peer: Some(Peer::empty(from_peer_id)),
284            ..Default::default()
285        }];
286
287        env.create_physical_table_metadata(table_info, region_routes)
288            .await;
289        let procedure_ctx = new_procedure_context();
290        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
291
292        let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
293    }
294
295    #[tokio::test]
296    async fn test_next_migration_abort() {
297        let mut state = Box::new(RegionMigrationStart);
298        // from_peer: 1
299        // to_peer: 2
300        let persistent_context = new_persistent_context();
301        let region_id = persistent_context.region_ids[0];
302        let env = TestingEnv::new();
303        let mut ctx = env.context_factory().new_context(persistent_context);
304
305        let table_info = new_test_table_info(1024, vec![1]).into();
306        let region_routes: Vec<RegionRoute> = vec![RegionRoute {
307            region: Region::new_test(region_id),
308            leader_peer: Some(Peer::empty(1024)),
309            ..Default::default()
310        }];
311
312        env.create_physical_table_metadata(table_info, region_routes)
313            .await;
314        let procedure_ctx = new_procedure_context();
315        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
316
317        let _ = next
318            .as_any()
319            .downcast_ref::<RegionMigrationAbort>()
320            .unwrap();
321    }
322}