meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29    /// Returns new [Vec<RegionRoute>].
30    fn build_upgrade_candidate_region_metadata(
31        &self,
32        ctx: &mut Context,
33        region_ids: &[RegionId],
34        mut region_routes: Vec<RegionRoute>,
35    ) -> Result<Vec<RegionRoute>> {
36        let old_leader_peer = &ctx.persistent_ctx.from_peer;
37        let new_leader_peer = &ctx.persistent_ctx.to_peer;
38        for region_id in region_ids {
39            // Find the RegionRoute for this region_id.
40            let region_route = region_routes
41                .iter_mut()
42                .find(|route| route.region.id == *region_id)
43                .context(error::RegionRouteNotFoundSnafu {
44                    region_id: *region_id,
45                })?;
46
47            // Remove any "downgraded leader" state.
48            region_route.set_leader_state(None);
49
50            // Check old leader matches expectation before upgrading to new leader.
51            ensure!(
52                region_route
53                    .leader_peer
54                    .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55                    .is_some(),
56                error::UnexpectedSnafu {
57                    violated: format!(
58                        "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59                        region_route.leader_peer, old_leader_peer
60                    ),
61                }
62            );
63
64            // Set new leader.
65            region_route.leader_peer = Some(new_leader_peer.clone());
66
67            // Remove new leader from followers (avoids duplicate leader/follower).
68            let removed = region_route
69                .follower_peers
70                .extract_if(.., |peer| peer.id == new_leader_peer.id)
71                .collect::<Vec<_>>();
72
73            // Warn if more than one follower with the new leader id was present.
74            if removed.len() > 1 {
75                warn!(
76                    "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77                );
78            }
79        }
80
81        info!(
82            "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83            new_leader_peer, region_ids,
84        );
85
86        Ok(region_routes)
87    }
88
89    /// Checks if metadata has been upgraded for a list of regions by verifying if their
90    /// leader peers have been switched to a specified peer ID (`to_peer_id`) and that
91    /// no region is in a leader downgrading state.
92    ///
93    /// Returns:
94    /// - `Ok(true)` if all regions' leader is the target peer and no downgrading occurs.
95    /// - `Ok(false)` if any region's leader is not the target peer.
96    /// - Error if region route or leader peer cannot be found, or an unexpected state is detected.
97    fn check_metadata_updated(
98        &self,
99        ctx: &mut Context,
100        region_ids: &[RegionId],
101        region_routes: &[RegionRoute],
102    ) -> Result<bool> {
103        // Iterate through each provided region ID
104        for region_id in region_ids {
105            // Find the route info for this region
106            let region_route = region_routes
107                .iter()
108                .find(|route| route.region.id == *region_id)
109                .context(error::RegionRouteNotFoundSnafu {
110                    region_id: *region_id,
111                })?;
112
113            // Get the leader peer for the region, error if not found
114            let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115                violated: format!(
116                    "The leader peer of region {region_id} is not found during the metadata upgrade check"
117                ),
118            })?;
119
120            // If the leader is not the expected peer, return false (i.e., not yet upgraded)
121            if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122                return Ok(false);
123            } else {
124                // If leader matches but region is in leader downgrading state, error (unexpected state)
125                ensure!(
126                    !region_route.is_leader_downgrading(),
127                    error::UnexpectedSnafu {
128                        violated: format!(
129                            "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130                        ),
131                    }
132                );
133            }
134        }
135
136        // All regions' leader match expected peer and are not downgrading; considered upgraded
137        Ok(true)
138    }
139
140    /// Upgrades the candidate region.
141    ///
142    /// Abort(non-retry):
143    /// - TableRoute or RegionRoute is not found.
144    ///   Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
145    ///
146    /// Retry:
147    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
148    /// - Failed to retrieve the metadata of table.
149    pub async fn upgrade_candidate_region(
150        &self,
151        ctx: &mut Context,
152        ctx_provider: &ContextProviderRef,
153    ) -> Result<()> {
154        let table_metadata_manager = ctx.table_metadata_manager.clone();
155        let table_regions = ctx.persistent_ctx.table_regions();
156        let from_peer_id = ctx.persistent_ctx.from_peer.id;
157        let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159        for (table_id, region_ids) in table_regions {
160            let table_lock = TableLock::Write(table_id).into();
161            let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163            let table_route_value = ctx.get_table_route_value(table_id).await?;
164            let region_routes = table_route_value.region_routes().with_context(|_| {
165                error::UnexpectedLogicalRouteTableSnafu {
166                    err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167                }
168            })?;
169            if self.check_metadata_updated(ctx, &region_ids, region_routes)? {
170                continue;
171            }
172            let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173            let RegionInfo {
174                region_storage_path,
175                region_options,
176                region_wal_options,
177                engine,
178            } = datanode_table_value.region_info.clone();
179            let new_region_routes = self.build_upgrade_candidate_region_metadata(
180                ctx,
181                &region_ids,
182                region_routes.clone(),
183            )?;
184            let region_distribution = region_distribution(region_routes);
185            info!(
186                "Trying to update region routes to {:?} for table: {}",
187                region_distribution, table_id,
188            );
189
190            if let Err(err) = table_metadata_manager
191                .update_table_route(
192                    table_id,
193                    RegionInfo {
194                        engine: engine.clone(),
195                        region_storage_path: region_storage_path.clone(),
196                        region_options: region_options.clone(),
197                        region_wal_options: region_wal_options.clone(),
198                    },
199                    &table_route_value,
200                    new_region_routes,
201                    &region_options,
202                    &region_wal_options,
203                )
204                .await
205                .context(error::TableMetadataManagerSnafu)
206            {
207                error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208                return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209                    reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210                });
211            };
212            info!(
213                "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214            );
215        }
216
217        ctx.deregister_failure_detectors().await;
218        // Consumes the guard.
219        ctx.volatile_ctx.opening_region_guards.clear();
220
221        Ok(())
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use std::assert_matches::assert_matches;
228
229    use common_meta::key::test_utils::new_test_table_info;
230    use common_meta::peer::Peer;
231    use common_meta::region_keeper::MemoryRegionKeeper;
232    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
233    use common_time::util::current_time_millis;
234    use store_api::storage::RegionId;
235
236    use crate::error::Error;
237    use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
238    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
239    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
240    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
241
242    fn new_persistent_context() -> PersistentContext {
243        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
244    }
245
246    #[tokio::test]
247    async fn test_table_route_is_not_found_error() {
248        let env = TestingEnv::new();
249        let persistent_context = new_persistent_context();
250        let ctx = env.context_factory().new_context(persistent_context);
251
252        let err = ctx.get_table_route_value(1024).await.unwrap_err();
253
254        assert_matches!(err, Error::TableRouteNotFound { .. });
255        assert!(!err.is_retryable());
256    }
257
258    #[tokio::test]
259    async fn test_region_route_is_not_found() {
260        let state = UpdateMetadata::Upgrade;
261        let env = TestingEnv::new();
262        let persistent_context = new_persistent_context();
263        let mut ctx = env.context_factory().new_context(persistent_context);
264
265        let table_info = new_test_table_info(1024, vec![2]).into();
266        let region_routes = vec![RegionRoute {
267            region: Region::new_test(RegionId::new(1024, 2)),
268            leader_peer: Some(Peer::empty(4)),
269            ..Default::default()
270        }];
271        env.create_physical_table_metadata(table_info, region_routes)
272            .await;
273
274        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
275        let region_routes = table_route_value
276            .into_inner()
277            .into_physical_table_route()
278            .region_routes;
279        let err = state
280            .build_upgrade_candidate_region_metadata(
281                &mut ctx,
282                &[RegionId::new(1024, 1)],
283                region_routes,
284            )
285            .unwrap_err();
286
287        assert_matches!(err, Error::RegionRouteNotFound { .. });
288        assert!(!err.is_retryable());
289    }
290
291    #[tokio::test]
292    async fn test_region_route_expected_leader() {
293        let state = UpdateMetadata::Upgrade;
294        let env = TestingEnv::new();
295        let persistent_context = new_persistent_context();
296        let mut ctx = env.context_factory().new_context(persistent_context);
297
298        let table_info = new_test_table_info(1024, vec![1]).into();
299        let region_routes = vec![RegionRoute {
300            region: Region::new_test(RegionId::new(1024, 1)),
301            leader_peer: Some(Peer::empty(3)),
302            ..Default::default()
303        }];
304
305        env.create_physical_table_metadata(table_info, region_routes)
306            .await;
307
308        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
309        let region_routes = table_route_value
310            .into_inner()
311            .into_physical_table_route()
312            .region_routes;
313        let err = state
314            .build_upgrade_candidate_region_metadata(
315                &mut ctx,
316                &[RegionId::new(1024, 1)],
317                region_routes,
318            )
319            .unwrap_err();
320
321        assert_matches!(err, Error::Unexpected { .. });
322        assert!(!err.is_retryable());
323        assert!(err.to_string().contains("Unexpected region leader"));
324    }
325
326    #[tokio::test]
327    async fn test_build_upgrade_candidate_region_metadata() {
328        let state = UpdateMetadata::Upgrade;
329        let env = TestingEnv::new();
330        let persistent_context = new_persistent_context();
331        let mut ctx = env.context_factory().new_context(persistent_context);
332
333        let table_info = new_test_table_info(1024, vec![1]).into();
334        let region_routes = vec![RegionRoute {
335            region: Region::new_test(RegionId::new(1024, 1)),
336            leader_peer: Some(Peer::empty(1)),
337            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
338            leader_state: Some(LeaderState::Downgrading),
339            leader_down_since: Some(current_time_millis()),
340        }];
341
342        env.create_physical_table_metadata(table_info, region_routes)
343            .await;
344
345        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
346        let region_routes = table_route_value
347            .into_inner()
348            .into_physical_table_route()
349            .region_routes;
350        let new_region_routes = state
351            .build_upgrade_candidate_region_metadata(
352                &mut ctx,
353                &[RegionId::new(1024, 1)],
354                region_routes,
355            )
356            .unwrap();
357
358        assert!(!new_region_routes[0].is_leader_downgrading());
359        assert!(new_region_routes[0].leader_down_since.is_none());
360        assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
361        assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
362    }
363
364    #[tokio::test]
365    async fn test_check_metadata() {
366        let state = UpdateMetadata::Upgrade;
367        let env = TestingEnv::new();
368        let persistent_context = new_persistent_context();
369        let leader_peer = persistent_context.from_peer.clone();
370
371        let mut ctx = env.context_factory().new_context(persistent_context);
372        let table_info = new_test_table_info(1024, vec![1]).into();
373        let region_routes = vec![RegionRoute {
374            region: Region::new_test(RegionId::new(1024, 1)),
375            leader_peer: Some(leader_peer),
376            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
377            leader_state: None,
378            leader_down_since: None,
379        }];
380
381        env.create_physical_table_metadata(table_info, region_routes)
382            .await;
383        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
384        let region_routes = table_routes.region_routes().unwrap();
385        let updated = state
386            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
387            .unwrap();
388        assert!(!updated);
389    }
390
391    #[tokio::test]
392    async fn test_check_metadata_updated() {
393        let state = UpdateMetadata::Upgrade;
394        let env = TestingEnv::new();
395        let persistent_context = new_persistent_context();
396        let candidate_peer = persistent_context.to_peer.clone();
397
398        let mut ctx = env.context_factory().new_context(persistent_context);
399        let table_info = new_test_table_info(1024, vec![1]).into();
400        let region_routes = vec![RegionRoute {
401            region: Region::new_test(RegionId::new(1024, 1)),
402            leader_peer: Some(candidate_peer),
403            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
404            leader_state: None,
405            leader_down_since: None,
406        }];
407
408        env.create_physical_table_metadata(table_info, region_routes)
409            .await;
410
411        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
412        let region_routes = table_routes.region_routes().unwrap();
413        let updated = state
414            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
415            .unwrap();
416        assert!(updated);
417    }
418
419    #[tokio::test]
420    async fn test_check_metadata_intermediate_state() {
421        let state = UpdateMetadata::Upgrade;
422        let env = TestingEnv::new();
423        let persistent_context = new_persistent_context();
424        let candidate_peer = persistent_context.to_peer.clone();
425
426        let mut ctx = env.context_factory().new_context(persistent_context);
427        let table_info = new_test_table_info(1024, vec![1]).into();
428        let region_routes = vec![RegionRoute {
429            region: Region::new_test(RegionId::new(1024, 1)),
430            leader_peer: Some(candidate_peer),
431            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
432            leader_state: Some(LeaderState::Downgrading),
433            leader_down_since: None,
434        }];
435
436        env.create_physical_table_metadata(table_info, region_routes)
437            .await;
438
439        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
440        let region_routes = table_routes.region_routes().unwrap();
441        let err = state
442            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
443            .unwrap_err();
444        assert_matches!(err, Error::Unexpected { .. });
445        assert!(err.to_string().contains("intermediate state"));
446    }
447
448    #[tokio::test]
449    async fn test_next_close_downgraded_region_state() {
450        let mut state = Box::new(UpdateMetadata::Upgrade);
451        let env = TestingEnv::new();
452        let persistent_context = new_persistent_context();
453        let mut ctx = env.context_factory().new_context(persistent_context);
454        let opening_keeper = MemoryRegionKeeper::default();
455
456        let table_id = 1024;
457        let table_info = new_test_table_info(table_id, vec![1]).into();
458        let region_routes = vec![RegionRoute {
459            region: Region::new_test(RegionId::new(table_id, 1)),
460            leader_peer: Some(Peer::empty(1)),
461            leader_state: Some(LeaderState::Downgrading),
462            ..Default::default()
463        }];
464
465        let guard = opening_keeper
466            .register(2, RegionId::new(table_id, 1))
467            .unwrap();
468        ctx.volatile_ctx.opening_region_guards.push(guard);
469
470        env.create_physical_table_metadata(table_info, region_routes)
471            .await;
472
473        let table_metadata_manager = env.table_metadata_manager();
474
475        let procedure_ctx = new_procedure_context();
476        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
477
478        let _ = next
479            .as_any()
480            .downcast_ref::<CloseDowngradedRegion>()
481            .unwrap();
482
483        let table_route = table_metadata_manager
484            .table_route_manager()
485            .table_route_storage()
486            .get(table_id)
487            .await
488            .unwrap()
489            .unwrap();
490        let region_routes = table_route.region_routes().unwrap();
491
492        assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
493        assert_eq!(region_routes.len(), 1);
494        assert!(!region_routes[0].is_leader_downgrading());
495        assert!(region_routes[0].follower_peers.is_empty());
496        assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
497    }
498}