Skip to main content

meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29    /// Returns new [Vec<RegionRoute>].
30    fn build_upgrade_candidate_region_metadata(
31        &self,
32        ctx: &mut Context,
33        region_ids: &[RegionId],
34        mut region_routes: Vec<RegionRoute>,
35    ) -> Result<Vec<RegionRoute>> {
36        let old_leader_peer = &ctx.persistent_ctx.from_peer;
37        let new_leader_peer = &ctx.persistent_ctx.to_peer;
38        for region_id in region_ids {
39            // Find the RegionRoute for this region_id.
40            let region_route = region_routes
41                .iter_mut()
42                .find(|route| route.region.id == *region_id)
43                .context(error::RegionRouteNotFoundSnafu {
44                    region_id: *region_id,
45                })?;
46
47            // Remove any "downgraded leader" state.
48            region_route.set_leader_state(None);
49
50            // Check old leader matches expectation before upgrading to new leader.
51            ensure!(
52                region_route
53                    .leader_peer
54                    .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55                    .is_some(),
56                error::UnexpectedSnafu {
57                    violated: format!(
58                        "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59                        region_route.leader_peer, old_leader_peer
60                    ),
61                }
62            );
63
64            // Set new leader.
65            region_route.leader_peer = Some(new_leader_peer.clone());
66
67            // Remove new leader from followers (avoids duplicate leader/follower).
68            let removed = region_route
69                .follower_peers
70                .extract_if(.., |peer| peer.id == new_leader_peer.id)
71                .collect::<Vec<_>>();
72
73            // Warn if more than one follower with the new leader id was present.
74            if removed.len() > 1 {
75                warn!(
76                    "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77                );
78            }
79        }
80
81        info!(
82            "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83            new_leader_peer, region_ids,
84        );
85
86        Ok(region_routes)
87    }
88
89    /// Checks if metadata has been upgraded for a list of regions by verifying if their
90    /// leader peers have been switched to a specified peer ID (`to_peer_id`) and that
91    /// no region is in a leader downgrading state.
92    ///
93    /// Returns:
94    /// - `Ok(true)` if all regions' leader is the target peer and no downgrading occurs.
95    /// - `Ok(false)` if any region's leader is not the target peer.
96    /// - Error if region route or leader peer cannot be found, or an unexpected state is detected.
97    fn check_metadata_updated(
98        &self,
99        ctx: &mut Context,
100        region_ids: &[RegionId],
101        region_routes: &[RegionRoute],
102    ) -> Result<bool> {
103        // Iterate through each provided region ID
104        for region_id in region_ids {
105            // Find the route info for this region
106            let region_route = region_routes
107                .iter()
108                .find(|route| route.region.id == *region_id)
109                .context(error::RegionRouteNotFoundSnafu {
110                    region_id: *region_id,
111                })?;
112
113            // Get the leader peer for the region, error if not found
114            let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115                violated: format!(
116                    "The leader peer of region {region_id} is not found during the metadata upgrade check"
117                ),
118            })?;
119
120            // If the leader is not the expected peer, return false (i.e., not yet upgraded)
121            if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122                return Ok(false);
123            } else {
124                // If leader matches but region is in leader downgrading state, error (unexpected state)
125                ensure!(
126                    !region_route.is_leader_downgrading(),
127                    error::UnexpectedSnafu {
128                        violated: format!(
129                            "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130                        ),
131                    }
132                );
133            }
134        }
135
136        // All regions' leader match expected peer and are not downgrading; considered upgraded
137        Ok(true)
138    }
139
140    /// Upgrades the candidate region.
141    ///
142    /// Abort(non-retry):
143    /// - TableRoute or RegionRoute is not found.
144    ///   Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
145    ///
146    /// Retry:
147    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
148    /// - Failed to retrieve the metadata of table.
149    pub async fn upgrade_candidate_region(
150        &self,
151        ctx: &mut Context,
152        ctx_provider: &ContextProviderRef,
153    ) -> Result<()> {
154        let table_metadata_manager = ctx.table_metadata_manager.clone();
155        let table_regions = ctx.persistent_ctx.table_regions();
156        let from_peer_id = ctx.persistent_ctx.from_peer.id;
157        let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159        for (table_id, region_ids) in table_regions {
160            let table_lock = TableLock::Write(table_id).into();
161            let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163            let table_route_value = ctx.get_table_route_value(table_id).await?;
164            let region_routes = table_route_value.region_routes().with_context(|_| {
165                error::UnexpectedLogicalRouteTableSnafu {
166                    err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167                }
168            })?;
169            if self.check_metadata_updated(ctx, &region_ids, region_routes)? {
170                continue;
171            }
172            let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173            let RegionInfo {
174                region_storage_path,
175                region_options,
176                region_wal_options,
177                engine,
178            } = datanode_table_value.region_info.clone();
179            let new_region_routes = self.build_upgrade_candidate_region_metadata(
180                ctx,
181                &region_ids,
182                region_routes.clone(),
183            )?;
184            let region_distribution = region_distribution(region_routes);
185            info!(
186                "Trying to update region routes to {:?} for table: {}",
187                region_distribution, table_id,
188            );
189
190            if let Err(err) = table_metadata_manager
191                .update_table_route(
192                    table_id,
193                    RegionInfo {
194                        engine: engine.clone(),
195                        region_storage_path: region_storage_path.clone(),
196                        region_options: region_options.clone(),
197                        region_wal_options: region_wal_options.clone(),
198                    },
199                    &table_route_value,
200                    new_region_routes,
201                    &region_options,
202                    &region_wal_options,
203                )
204                .await
205                .context(error::TableMetadataManagerSnafu)
206            {
207                error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208                return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209                    reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210                });
211            };
212            info!(
213                "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214            );
215        }
216
217        ctx.deregister_failure_detectors().await;
218        // Consumes the guard.
219        ctx.volatile_ctx.opening_region_guards.clear();
220
221        Ok(())
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use std::assert_matches;
228
229    use common_meta::key::test_utils::new_test_table_info;
230    use common_meta::peer::Peer;
231    use common_meta::region_keeper::MemoryRegionKeeper;
232    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
233    use common_time::util::current_time_millis;
234    use store_api::region_engine::RegionRole;
235    use store_api::storage::RegionId;
236
237    use crate::error::Error;
238    use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
239    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
240    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
241    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
242
243    fn new_persistent_context() -> PersistentContext {
244        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
245    }
246
247    #[tokio::test]
248    async fn test_table_route_is_not_found_error() {
249        let env = TestingEnv::new();
250        let persistent_context = new_persistent_context();
251        let ctx = env.context_factory().new_context(persistent_context);
252
253        let err = ctx.get_table_route_value(1024).await.unwrap_err();
254
255        assert_matches!(err, Error::TableRouteNotFound { .. });
256        assert!(!err.is_retryable());
257    }
258
259    #[tokio::test]
260    async fn test_region_route_is_not_found() {
261        let state = UpdateMetadata::Upgrade;
262        let env = TestingEnv::new();
263        let persistent_context = new_persistent_context();
264        let mut ctx = env.context_factory().new_context(persistent_context);
265
266        let table_info = new_test_table_info(1024);
267        let region_routes = vec![RegionRoute {
268            region: Region::new_test(RegionId::new(1024, 2)),
269            leader_peer: Some(Peer::empty(4)),
270            ..Default::default()
271        }];
272        env.create_physical_table_metadata(table_info, region_routes)
273            .await;
274
275        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
276        let region_routes = table_route_value
277            .into_inner()
278            .into_physical_table_route()
279            .region_routes;
280        let err = state
281            .build_upgrade_candidate_region_metadata(
282                &mut ctx,
283                &[RegionId::new(1024, 1)],
284                region_routes,
285            )
286            .unwrap_err();
287
288        assert_matches!(err, Error::RegionRouteNotFound { .. });
289        assert!(!err.is_retryable());
290    }
291
292    #[tokio::test]
293    async fn test_region_route_expected_leader() {
294        let state = UpdateMetadata::Upgrade;
295        let env = TestingEnv::new();
296        let persistent_context = new_persistent_context();
297        let mut ctx = env.context_factory().new_context(persistent_context);
298
299        let table_info = new_test_table_info(1024);
300        let region_routes = vec![RegionRoute {
301            region: Region::new_test(RegionId::new(1024, 1)),
302            leader_peer: Some(Peer::empty(3)),
303            ..Default::default()
304        }];
305
306        env.create_physical_table_metadata(table_info, region_routes)
307            .await;
308
309        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
310        let region_routes = table_route_value
311            .into_inner()
312            .into_physical_table_route()
313            .region_routes;
314        let err = state
315            .build_upgrade_candidate_region_metadata(
316                &mut ctx,
317                &[RegionId::new(1024, 1)],
318                region_routes,
319            )
320            .unwrap_err();
321
322        assert_matches!(err, Error::Unexpected { .. });
323        assert!(!err.is_retryable());
324        assert!(err.to_string().contains("Unexpected region leader"));
325    }
326
327    #[tokio::test]
328    async fn test_build_upgrade_candidate_region_metadata() {
329        let state = UpdateMetadata::Upgrade;
330        let env = TestingEnv::new();
331        let persistent_context = new_persistent_context();
332        let mut ctx = env.context_factory().new_context(persistent_context);
333
334        let table_info = new_test_table_info(1024);
335        let region_routes = vec![RegionRoute {
336            region: Region::new_test(RegionId::new(1024, 1)),
337            leader_peer: Some(Peer::empty(1)),
338            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
339            leader_state: Some(LeaderState::Downgrading),
340            leader_down_since: Some(current_time_millis()),
341            write_route_policy: None,
342        }];
343
344        env.create_physical_table_metadata(table_info, region_routes)
345            .await;
346
347        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
348        let region_routes = table_route_value
349            .into_inner()
350            .into_physical_table_route()
351            .region_routes;
352        let new_region_routes = state
353            .build_upgrade_candidate_region_metadata(
354                &mut ctx,
355                &[RegionId::new(1024, 1)],
356                region_routes,
357            )
358            .unwrap();
359
360        assert!(!new_region_routes[0].is_leader_downgrading());
361        assert!(new_region_routes[0].leader_down_since.is_none());
362        assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
363        assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
364    }
365
366    #[tokio::test]
367    async fn test_check_metadata() {
368        let state = UpdateMetadata::Upgrade;
369        let env = TestingEnv::new();
370        let persistent_context = new_persistent_context();
371        let leader_peer = persistent_context.from_peer.clone();
372
373        let mut ctx = env.context_factory().new_context(persistent_context);
374        let table_info = new_test_table_info(1024);
375        let region_routes = vec![RegionRoute {
376            region: Region::new_test(RegionId::new(1024, 1)),
377            leader_peer: Some(leader_peer),
378            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
379            leader_state: None,
380            leader_down_since: None,
381            write_route_policy: None,
382        }];
383
384        env.create_physical_table_metadata(table_info, region_routes)
385            .await;
386        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
387        let region_routes = table_routes.region_routes().unwrap();
388        let updated = state
389            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
390            .unwrap();
391        assert!(!updated);
392    }
393
394    #[tokio::test]
395    async fn test_check_metadata_updated() {
396        let state = UpdateMetadata::Upgrade;
397        let env = TestingEnv::new();
398        let persistent_context = new_persistent_context();
399        let candidate_peer = persistent_context.to_peer.clone();
400
401        let mut ctx = env.context_factory().new_context(persistent_context);
402        let table_info = new_test_table_info(1024);
403        let region_routes = vec![RegionRoute {
404            region: Region::new_test(RegionId::new(1024, 1)),
405            leader_peer: Some(candidate_peer),
406            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
407            leader_state: None,
408            leader_down_since: None,
409            write_route_policy: None,
410        }];
411
412        env.create_physical_table_metadata(table_info, region_routes)
413            .await;
414
415        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
416        let region_routes = table_routes.region_routes().unwrap();
417        let updated = state
418            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
419            .unwrap();
420        assert!(updated);
421    }
422
423    #[tokio::test]
424    async fn test_check_metadata_intermediate_state() {
425        let state = UpdateMetadata::Upgrade;
426        let env = TestingEnv::new();
427        let persistent_context = new_persistent_context();
428        let candidate_peer = persistent_context.to_peer.clone();
429
430        let mut ctx = env.context_factory().new_context(persistent_context);
431        let table_info = new_test_table_info(1024);
432        let region_routes = vec![RegionRoute {
433            region: Region::new_test(RegionId::new(1024, 1)),
434            leader_peer: Some(candidate_peer),
435            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
436            leader_state: Some(LeaderState::Downgrading),
437            leader_down_since: None,
438            write_route_policy: None,
439        }];
440
441        env.create_physical_table_metadata(table_info, region_routes)
442            .await;
443
444        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
445        let region_routes = table_routes.region_routes().unwrap();
446        let err = state
447            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
448            .unwrap_err();
449        assert_matches!(err, Error::Unexpected { .. });
450        assert!(err.to_string().contains("intermediate state"));
451    }
452
453    #[tokio::test]
454    async fn test_next_close_downgraded_region_state() {
455        let mut state = Box::new(UpdateMetadata::Upgrade);
456        let env = TestingEnv::new();
457        let persistent_context = new_persistent_context();
458        let mut ctx = env.context_factory().new_context(persistent_context);
459        let opening_keeper = MemoryRegionKeeper::default();
460
461        let table_id = 1024;
462        let table_info = new_test_table_info(table_id);
463        let region_routes = vec![RegionRoute {
464            region: Region::new_test(RegionId::new(table_id, 1)),
465            leader_peer: Some(Peer::empty(1)),
466            leader_state: Some(LeaderState::Downgrading),
467            ..Default::default()
468        }];
469
470        let guard = opening_keeper
471            .register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower)
472            .unwrap();
473        ctx.volatile_ctx.opening_region_guards.push(guard);
474
475        env.create_physical_table_metadata(table_info, region_routes)
476            .await;
477
478        let table_metadata_manager = env.table_metadata_manager();
479
480        let procedure_ctx = new_procedure_context();
481        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
482
483        let _ = next
484            .as_any()
485            .downcast_ref::<CloseDowngradedRegion>()
486            .unwrap();
487
488        let table_route = table_metadata_manager
489            .table_route_manager()
490            .table_route_storage()
491            .get(table_id)
492            .await
493            .unwrap()
494            .unwrap();
495        let region_routes = table_route.region_routes().unwrap();
496
497        assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
498        assert_eq!(region_routes.len(), 1);
499        assert!(!region_routes[0].is_leader_downgrading());
500        assert!(region_routes[0].follower_peers.is_empty());
501        assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
502    }
503}