meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::rpc::router::{region_distribution, RegionRoute};
18use common_telemetry::{info, warn};
19use snafu::{ensure, OptionExt, ResultExt};
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::update_metadata::UpdateMetadata;
23use crate::procedure::region_migration::Context;
24
25impl UpdateMetadata {
26    /// Returns new [Vec<RegionRoute>].
27    async fn build_upgrade_candidate_region_metadata(
28        &self,
29        ctx: &mut Context,
30    ) -> Result<Vec<RegionRoute>> {
31        let region_id = ctx.region_id();
32        let table_route_value = ctx.get_table_route_value().await?.clone();
33
34        let mut region_routes = table_route_value
35            .region_routes()
36            .context(error::UnexpectedLogicalRouteTableSnafu {
37                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
38            })?
39            .clone();
40        let region_route = region_routes
41            .iter_mut()
42            .find(|route| route.region.id == region_id)
43            .context(error::RegionRouteNotFoundSnafu { region_id })?;
44
45        // Removes downgraded status.
46        region_route.set_leader_state(None);
47
48        let candidate = &ctx.persistent_ctx.to_peer;
49        let expected_old_leader = &ctx.persistent_ctx.from_peer;
50
51        // Upgrades candidate to leader.
52        ensure!(region_route
53                .leader_peer
54                .take_if(|old_leader| old_leader.id == expected_old_leader.id)
55                .is_some(),
56                error::UnexpectedSnafu{
57                    violated: format!("Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}", region_route.leader_peer, expected_old_leader),
58                }
59            );
60
61        region_route.leader_peer = Some(candidate.clone());
62        info!(
63            "Upgrading candidate region to leader region: {:?} for region: {}",
64            candidate, region_id
65        );
66
67        // Removes the candidate region in followers.
68        let removed = region_route
69            .follower_peers
70            .extract_if(.., |peer| peer.id == candidate.id)
71            .collect::<Vec<_>>();
72
73        if removed.len() > 1 {
74            warn!(
75                    "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}"
76                );
77        }
78
79        Ok(region_routes)
80    }
81
82    /// Returns true if region metadata has been updated.
83    async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
84        let region_id = ctx.region_id();
85        let table_route_value = ctx.get_table_route_value().await?.clone();
86
87        let region_routes = table_route_value
88            .region_routes()
89            .context(error::UnexpectedLogicalRouteTableSnafu {
90                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
91            })?
92            .clone();
93        let region_route = region_routes
94            .into_iter()
95            .find(|route| route.region.id == region_id)
96            .context(error::RegionRouteNotFoundSnafu { region_id })?;
97
98        let leader_peer = region_route
99            .leader_peer
100            .as_ref()
101            .context(error::UnexpectedSnafu {
102                violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
103            })?;
104
105        let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
106
107        if leader_peer.id == candidate_peer_id {
108            ensure!(
109                !region_route.is_leader_downgrading(),
110                error::UnexpectedSnafu {
111                    violated: format!("Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"),
112                }
113            );
114
115            Ok(true)
116        } else {
117            Ok(false)
118        }
119    }
120
121    /// Upgrades the candidate region.
122    ///
123    /// Abort(non-retry):
124    /// - TableRoute or RegionRoute is not found.
125    ///   Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
126    ///
127    /// Retry:
128    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
129    /// - Failed to retrieve the metadata of table.
130    pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> {
131        let region_id = ctx.region_id();
132        let table_metadata_manager = ctx.table_metadata_manager.clone();
133
134        if self.check_metadata_updated(ctx).await? {
135            return Ok(());
136        }
137
138        let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
139        let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
140        let RegionInfo {
141            region_storage_path,
142            region_options,
143            region_wal_options,
144            engine,
145        } = datanode_table_value.region_info.clone();
146        let table_route_value = ctx.get_table_route_value().await?;
147
148        let region_distribution = region_distribution(&region_routes);
149        info!(
150            "Trying to update region routes to {:?} for table: {}",
151            region_distribution,
152            region_id.table_id()
153        );
154        if let Err(err) = table_metadata_manager
155            .update_table_route(
156                region_id.table_id(),
157                RegionInfo {
158                    engine: engine.to_string(),
159                    region_storage_path: region_storage_path.to_string(),
160                    region_options: region_options.clone(),
161                    region_wal_options: region_wal_options.clone(),
162                },
163                table_route_value,
164                region_routes,
165                &region_options,
166                &region_wal_options,
167            )
168            .await
169            .context(error::TableMetadataManagerSnafu)
170        {
171            ctx.remove_table_route_value();
172            return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
173                reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
174            });
175        };
176
177        ctx.remove_table_route_value();
178        ctx.deregister_failure_detectors().await;
179        // Consumes the guard.
180        ctx.volatile_ctx.opening_region_guard.take();
181
182        Ok(())
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use std::assert_matches::assert_matches;
189
190    use common_meta::key::test_utils::new_test_table_info;
191    use common_meta::peer::Peer;
192    use common_meta::region_keeper::MemoryRegionKeeper;
193    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
194    use common_time::util::current_time_millis;
195    use store_api::storage::RegionId;
196
197    use crate::error::Error;
198    use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
199    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
200    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
201    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
202
203    fn new_persistent_context() -> PersistentContext {
204        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
205    }
206
207    #[tokio::test]
208    async fn test_table_route_is_not_found_error() {
209        let state = UpdateMetadata::Upgrade;
210
211        let env = TestingEnv::new();
212        let persistent_context = new_persistent_context();
213        let mut ctx = env.context_factory().new_context(persistent_context);
214
215        let err = state
216            .build_upgrade_candidate_region_metadata(&mut ctx)
217            .await
218            .unwrap_err();
219
220        assert_matches!(err, Error::TableRouteNotFound { .. });
221        assert!(!err.is_retryable());
222    }
223
224    #[tokio::test]
225    async fn test_region_route_is_not_found() {
226        let state = UpdateMetadata::Upgrade;
227        let env = TestingEnv::new();
228        let persistent_context = new_persistent_context();
229        let mut ctx = env.context_factory().new_context(persistent_context);
230
231        let table_info = new_test_table_info(1024, vec![2]).into();
232        let region_routes = vec![RegionRoute {
233            region: Region::new_test(RegionId::new(1024, 2)),
234            leader_peer: Some(Peer::empty(4)),
235            ..Default::default()
236        }];
237
238        env.create_physical_table_metadata(table_info, region_routes)
239            .await;
240
241        let err = state
242            .build_upgrade_candidate_region_metadata(&mut ctx)
243            .await
244            .unwrap_err();
245
246        assert_matches!(err, Error::RegionRouteNotFound { .. });
247        assert!(!err.is_retryable());
248    }
249
250    #[tokio::test]
251    async fn test_region_route_expected_leader() {
252        let state = UpdateMetadata::Upgrade;
253        let env = TestingEnv::new();
254        let persistent_context = new_persistent_context();
255        let mut ctx = env.context_factory().new_context(persistent_context);
256
257        let table_info = new_test_table_info(1024, vec![1]).into();
258        let region_routes = vec![RegionRoute {
259            region: Region::new_test(RegionId::new(1024, 1)),
260            leader_peer: Some(Peer::empty(3)),
261            ..Default::default()
262        }];
263
264        env.create_physical_table_metadata(table_info, region_routes)
265            .await;
266
267        let err = state
268            .build_upgrade_candidate_region_metadata(&mut ctx)
269            .await
270            .unwrap_err();
271
272        assert_matches!(err, Error::Unexpected { .. });
273        assert!(!err.is_retryable());
274        assert!(err.to_string().contains("Unexpected region leader"));
275    }
276
277    #[tokio::test]
278    async fn test_build_upgrade_candidate_region_metadata() {
279        let state = UpdateMetadata::Upgrade;
280        let env = TestingEnv::new();
281        let persistent_context = new_persistent_context();
282        let mut ctx = env.context_factory().new_context(persistent_context);
283
284        let table_info = new_test_table_info(1024, vec![1]).into();
285        let region_routes = vec![RegionRoute {
286            region: Region::new_test(RegionId::new(1024, 1)),
287            leader_peer: Some(Peer::empty(1)),
288            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
289            leader_state: Some(LeaderState::Downgrading),
290            leader_down_since: Some(current_time_millis()),
291        }];
292
293        env.create_physical_table_metadata(table_info, region_routes)
294            .await;
295
296        let new_region_routes = state
297            .build_upgrade_candidate_region_metadata(&mut ctx)
298            .await
299            .unwrap();
300
301        assert!(!new_region_routes[0].is_leader_downgrading());
302        assert!(new_region_routes[0].leader_down_since.is_none());
303        assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
304        assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
305    }
306
307    #[tokio::test]
308    async fn test_failed_to_update_table_route_error() {
309        let state = UpdateMetadata::Upgrade;
310        let env = TestingEnv::new();
311        let persistent_context = new_persistent_context();
312        let mut ctx = env.context_factory().new_context(persistent_context);
313        let opening_keeper = MemoryRegionKeeper::default();
314
315        let table_id = 1024;
316        let table_info = new_test_table_info(table_id, vec![1]).into();
317        let region_routes = vec![
318            RegionRoute {
319                region: Region::new_test(RegionId::new(table_id, 1)),
320                leader_peer: Some(Peer::empty(1)),
321                follower_peers: vec![Peer::empty(5), Peer::empty(3)],
322                leader_state: Some(LeaderState::Downgrading),
323                leader_down_since: Some(current_time_millis()),
324            },
325            RegionRoute {
326                region: Region::new_test(RegionId::new(table_id, 2)),
327                leader_peer: Some(Peer::empty(4)),
328                leader_state: Some(LeaderState::Downgrading),
329                ..Default::default()
330            },
331        ];
332
333        env.create_physical_table_metadata(table_info, region_routes)
334            .await;
335
336        let table_metadata_manager = env.table_metadata_manager();
337        let original_table_route = table_metadata_manager
338            .table_route_manager()
339            .table_route_storage()
340            .get_with_raw_bytes(table_id)
341            .await
342            .unwrap()
343            .unwrap();
344
345        // modifies the table route.
346        table_metadata_manager
347            .update_leader_region_status(table_id, &original_table_route, |route| {
348                if route.region.id == RegionId::new(1024, 2) {
349                    // Removes the status.
350                    Some(None)
351                } else {
352                    None
353                }
354            })
355            .await
356            .unwrap();
357
358        // sets the old table route.
359        ctx.volatile_ctx.table_route = Some(original_table_route);
360        let guard = opening_keeper
361            .register(2, RegionId::new(table_id, 1))
362            .unwrap();
363        ctx.volatile_ctx.opening_region_guard = Some(guard);
364        let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
365
366        assert!(ctx.volatile_ctx.table_route.is_none());
367        assert!(ctx.volatile_ctx.opening_region_guard.is_some());
368        assert!(err.is_retryable());
369        assert!(format!("{err:?}").contains("Failed to update the table route"));
370    }
371
372    #[tokio::test]
373    async fn test_check_metadata() {
374        let state = UpdateMetadata::Upgrade;
375        let env = TestingEnv::new();
376        let persistent_context = new_persistent_context();
377        let leader_peer = persistent_context.from_peer.clone();
378
379        let mut ctx = env.context_factory().new_context(persistent_context);
380        let table_info = new_test_table_info(1024, vec![1]).into();
381        let region_routes = vec![RegionRoute {
382            region: Region::new_test(RegionId::new(1024, 1)),
383            leader_peer: Some(leader_peer),
384            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
385            leader_state: None,
386            leader_down_since: None,
387        }];
388
389        env.create_physical_table_metadata(table_info, region_routes)
390            .await;
391
392        let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
393        assert!(!updated);
394    }
395
396    #[tokio::test]
397    async fn test_check_metadata_updated() {
398        let state = UpdateMetadata::Upgrade;
399        let env = TestingEnv::new();
400        let persistent_context = new_persistent_context();
401        let candidate_peer = persistent_context.to_peer.clone();
402
403        let mut ctx = env.context_factory().new_context(persistent_context);
404        let table_info = new_test_table_info(1024, vec![1]).into();
405        let region_routes = vec![RegionRoute {
406            region: Region::new_test(RegionId::new(1024, 1)),
407            leader_peer: Some(candidate_peer),
408            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
409            leader_state: None,
410            leader_down_since: None,
411        }];
412
413        env.create_physical_table_metadata(table_info, region_routes)
414            .await;
415
416        let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
417        assert!(updated);
418    }
419
420    #[tokio::test]
421    async fn test_check_metadata_intermediate_state() {
422        let state = UpdateMetadata::Upgrade;
423        let env = TestingEnv::new();
424        let persistent_context = new_persistent_context();
425        let candidate_peer = persistent_context.to_peer.clone();
426
427        let mut ctx = env.context_factory().new_context(persistent_context);
428        let table_info = new_test_table_info(1024, vec![1]).into();
429        let region_routes = vec![RegionRoute {
430            region: Region::new_test(RegionId::new(1024, 1)),
431            leader_peer: Some(candidate_peer),
432            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
433            leader_state: Some(LeaderState::Downgrading),
434            leader_down_since: None,
435        }];
436
437        env.create_physical_table_metadata(table_info, region_routes)
438            .await;
439
440        let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
441        assert_matches!(err, Error::Unexpected { .. });
442        assert!(err.to_string().contains("intermediate state"));
443    }
444
445    #[tokio::test]
446    async fn test_next_close_downgraded_region_state() {
447        let mut state = Box::new(UpdateMetadata::Upgrade);
448        let env = TestingEnv::new();
449        let persistent_context = new_persistent_context();
450        let mut ctx = env.context_factory().new_context(persistent_context);
451        let opening_keeper = MemoryRegionKeeper::default();
452
453        let table_id = 1024;
454        let table_info = new_test_table_info(table_id, vec![1]).into();
455        let region_routes = vec![RegionRoute {
456            region: Region::new_test(RegionId::new(table_id, 1)),
457            leader_peer: Some(Peer::empty(1)),
458            leader_state: Some(LeaderState::Downgrading),
459            ..Default::default()
460        }];
461
462        let guard = opening_keeper
463            .register(2, RegionId::new(table_id, 1))
464            .unwrap();
465        ctx.volatile_ctx.opening_region_guard = Some(guard);
466
467        env.create_physical_table_metadata(table_info, region_routes)
468            .await;
469
470        let table_metadata_manager = env.table_metadata_manager();
471
472        let procedure_ctx = new_procedure_context();
473        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
474
475        let _ = next
476            .as_any()
477            .downcast_ref::<CloseDowngradedRegion>()
478            .unwrap();
479
480        let table_route = table_metadata_manager
481            .table_route_manager()
482            .table_route_storage()
483            .get(table_id)
484            .await
485            .unwrap()
486            .unwrap();
487        let region_routes = table_route.region_routes().unwrap();
488
489        assert!(ctx.volatile_ctx.table_route.is_none());
490        assert!(ctx.volatile_ctx.opening_region_guard.is_none());
491        assert_eq!(region_routes.len(), 1);
492        assert!(!region_routes[0].is_leader_downgrading());
493        assert!(region_routes[0].follower_peers.is_empty());
494        assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
495    }
496}