Skip to main content

meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::lock_key::TableLock;
18use common_meta::rpc::router::{RegionRoute, region_distribution};
19use common_procedure::ContextProviderRef;
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::RegionId;
23
24use crate::error::{self, Result};
25use crate::procedure::region_migration::Context;
26use crate::procedure::region_migration::update_metadata::UpdateMetadata;
27
28impl UpdateMetadata {
29    /// Returns new [Vec<RegionRoute>].
30    fn build_upgrade_candidate_region_metadata(
31        &self,
32        ctx: &mut Context,
33        region_ids: &[RegionId],
34        mut region_routes: Vec<RegionRoute>,
35    ) -> Result<Vec<RegionRoute>> {
36        let old_leader_peer = &ctx.persistent_ctx.from_peer;
37        let new_leader_peer = &ctx.persistent_ctx.to_peer;
38        for region_id in region_ids {
39            // Find the RegionRoute for this region_id.
40            let region_route = region_routes
41                .iter_mut()
42                .find(|route| route.region.id == *region_id)
43                .context(error::RegionRouteNotFoundSnafu {
44                    region_id: *region_id,
45                })?;
46
47            // Remove any "downgraded leader" state.
48            region_route.set_leader_state(None);
49
50            // Check old leader matches expectation before upgrading to new leader.
51            ensure!(
52                region_route
53                    .leader_peer
54                    .take_if(|old_leader| old_leader.id == old_leader_peer.id)
55                    .is_some(),
56                error::UnexpectedSnafu {
57                    violated: format!(
58                        "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
59                        region_route.leader_peer, old_leader_peer
60                    ),
61                }
62            );
63
64            // Set new leader.
65            region_route.leader_peer = Some(new_leader_peer.clone());
66
67            // Remove new leader from followers (avoids duplicate leader/follower).
68            let removed = region_route
69                .follower_peers
70                .extract_if(.., |peer| peer.id == new_leader_peer.id)
71                .collect::<Vec<_>>();
72
73            // Warn if more than one follower with the new leader id was present.
74            if removed.len() > 1 {
75                warn!(
76                    "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
77                );
78            }
79        }
80
81        info!(
82            "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
83            new_leader_peer, region_ids,
84        );
85
86        Ok(region_routes)
87    }
88
89    /// Checks if metadata has been upgraded for a list of regions by verifying if their
90    /// leader peers have been switched to a specified peer ID (`to_peer_id`) and that
91    /// no region is in a leader downgrading state.
92    ///
93    /// Returns:
94    /// - `Ok(true)` if all regions' leader is the target peer and no downgrading occurs.
95    /// - `Ok(false)` if any region's leader is not the target peer.
96    /// - Error if region route or leader peer cannot be found, or an unexpected state is detected.
97    fn check_metadata_updated(
98        &self,
99        ctx: &mut Context,
100        region_ids: &[RegionId],
101        region_routes: &[RegionRoute],
102    ) -> Result<bool> {
103        // Iterate through each provided region ID
104        for region_id in region_ids {
105            // Find the route info for this region
106            let region_route = region_routes
107                .iter()
108                .find(|route| route.region.id == *region_id)
109                .context(error::RegionRouteNotFoundSnafu {
110                    region_id: *region_id,
111                })?;
112
113            // Get the leader peer for the region, error if not found
114            let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
115                violated: format!(
116                    "The leader peer of region {region_id} is not found during the metadata upgrade check"
117                ),
118            })?;
119
120            // If the leader is not the expected peer, return false (i.e., not yet upgraded)
121            if leader_peer.id != ctx.persistent_ctx.to_peer.id {
122                return Ok(false);
123            } else {
124                // If leader matches but region is in leader downgrading state, error (unexpected state)
125                ensure!(
126                    !region_route.is_leader_downgrading(),
127                    error::UnexpectedSnafu {
128                        violated: format!(
129                            "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
130                        ),
131                    }
132                );
133            }
134        }
135
136        // All regions' leader match expected peer and are not downgrading; considered upgraded
137        Ok(true)
138    }
139
140    /// Upgrades the candidate region.
141    ///
142    /// Abort(non-retry):
143    /// - TableRoute or RegionRoute is not found.
144    ///   Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
145    ///
146    /// Retry:
147    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
148    /// - Failed to retrieve the metadata of table.
149    pub async fn upgrade_candidate_region(
150        &self,
151        ctx: &mut Context,
152        ctx_provider: &ContextProviderRef,
153    ) -> Result<()> {
154        let table_metadata_manager = ctx.table_metadata_manager.clone();
155        let table_regions = ctx.persistent_ctx.table_regions();
156        let from_peer_id = ctx.persistent_ctx.from_peer.id;
157        let to_peer_id = ctx.persistent_ctx.to_peer.id;
158
159        for (table_id, region_ids) in table_regions {
160            let table_lock = TableLock::Write(table_id).into();
161            let _guard = ctx_provider.acquire_lock(&table_lock).await;
162
163            let table_route_value = ctx.get_table_route_value(table_id).await?;
164            let region_routes = table_route_value.region_routes().with_context(|_| {
165                error::UnexpectedLogicalRouteTableSnafu {
166                    err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
167                }
168            })?;
169            if self.check_metadata_updated(ctx, &region_ids, region_routes)? {
170                continue;
171            }
172            let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
173            let RegionInfo {
174                region_storage_path,
175                region_options,
176                region_wal_options,
177                engine,
178            } = datanode_table_value.region_info.clone();
179            let new_region_routes = self.build_upgrade_candidate_region_metadata(
180                ctx,
181                &region_ids,
182                region_routes.clone(),
183            )?;
184            let region_distribution = region_distribution(region_routes);
185            info!(
186                "Trying to update region routes to {:?} for table: {}",
187                region_distribution, table_id,
188            );
189
190            if let Err(err) = table_metadata_manager
191                .update_table_route(
192                    table_id,
193                    RegionInfo {
194                        engine: engine.clone(),
195                        region_storage_path: region_storage_path.clone(),
196                        region_options: region_options.clone(),
197                        region_wal_options: region_wal_options.clone(),
198                    },
199                    &table_route_value,
200                    new_region_routes,
201                    &region_options,
202                    &region_wal_options,
203                )
204                .await
205                .context(error::TableMetadataManagerSnafu)
206            {
207                error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
208                return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
209                    reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
210                });
211            };
212            info!(
213                "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
214            );
215        }
216
217        ctx.deregister_failure_detectors().await;
218        ctx.reset_failure_detectors_for_candidate_regions().await;
219        // Consumes the guard.
220        ctx.volatile_ctx.opening_region_guards.clear();
221
222        Ok(())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use std::assert_matches;
229
230    use common_meta::key::test_utils::new_test_table_info;
231    use common_meta::peer::Peer;
232    use common_meta::region_keeper::MemoryRegionKeeper;
233    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
234    use common_time::util::current_time_millis;
235    use store_api::region_engine::RegionRole;
236    use store_api::storage::RegionId;
237
238    use crate::error::Error;
239    use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
240    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
241    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
242    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
243
244    fn new_persistent_context() -> PersistentContext {
245        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
246    }
247
248    #[tokio::test]
249    async fn test_table_route_is_not_found_error() {
250        let env = TestingEnv::new();
251        let persistent_context = new_persistent_context();
252        let ctx = env.context_factory().new_context(persistent_context);
253
254        let err = ctx.get_table_route_value(1024).await.unwrap_err();
255
256        assert_matches!(err, Error::TableRouteNotFound { .. });
257        assert!(!err.is_retryable());
258    }
259
260    #[tokio::test]
261    async fn test_region_route_is_not_found() {
262        let state = UpdateMetadata::Upgrade;
263        let env = TestingEnv::new();
264        let persistent_context = new_persistent_context();
265        let mut ctx = env.context_factory().new_context(persistent_context);
266
267        let table_info = new_test_table_info(1024);
268        let region_routes = vec![RegionRoute {
269            region: Region::new_test(RegionId::new(1024, 2)),
270            leader_peer: Some(Peer::empty(4)),
271            ..Default::default()
272        }];
273        env.create_physical_table_metadata(table_info, region_routes)
274            .await;
275
276        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
277        let region_routes = table_route_value
278            .into_inner()
279            .into_physical_table_route()
280            .region_routes;
281        let err = state
282            .build_upgrade_candidate_region_metadata(
283                &mut ctx,
284                &[RegionId::new(1024, 1)],
285                region_routes,
286            )
287            .unwrap_err();
288
289        assert_matches!(err, Error::RegionRouteNotFound { .. });
290        assert!(!err.is_retryable());
291    }
292
293    #[tokio::test]
294    async fn test_region_route_expected_leader() {
295        let state = UpdateMetadata::Upgrade;
296        let env = TestingEnv::new();
297        let persistent_context = new_persistent_context();
298        let mut ctx = env.context_factory().new_context(persistent_context);
299
300        let table_info = new_test_table_info(1024);
301        let region_routes = vec![RegionRoute {
302            region: Region::new_test(RegionId::new(1024, 1)),
303            leader_peer: Some(Peer::empty(3)),
304            ..Default::default()
305        }];
306
307        env.create_physical_table_metadata(table_info, region_routes)
308            .await;
309
310        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
311        let region_routes = table_route_value
312            .into_inner()
313            .into_physical_table_route()
314            .region_routes;
315        let err = state
316            .build_upgrade_candidate_region_metadata(
317                &mut ctx,
318                &[RegionId::new(1024, 1)],
319                region_routes,
320            )
321            .unwrap_err();
322
323        assert_matches!(err, Error::Unexpected { .. });
324        assert!(!err.is_retryable());
325        assert!(err.to_string().contains("Unexpected region leader"));
326    }
327
328    #[tokio::test]
329    async fn test_build_upgrade_candidate_region_metadata() {
330        let state = UpdateMetadata::Upgrade;
331        let env = TestingEnv::new();
332        let persistent_context = new_persistent_context();
333        let mut ctx = env.context_factory().new_context(persistent_context);
334
335        let table_info = new_test_table_info(1024);
336        let region_routes = vec![RegionRoute {
337            region: Region::new_test(RegionId::new(1024, 1)),
338            leader_peer: Some(Peer::empty(1)),
339            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
340            leader_state: Some(LeaderState::Downgrading),
341            leader_down_since: Some(current_time_millis()),
342            write_route_policy: None,
343        }];
344
345        env.create_physical_table_metadata(table_info, region_routes)
346            .await;
347
348        let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
349        let region_routes = table_route_value
350            .into_inner()
351            .into_physical_table_route()
352            .region_routes;
353        let new_region_routes = state
354            .build_upgrade_candidate_region_metadata(
355                &mut ctx,
356                &[RegionId::new(1024, 1)],
357                region_routes,
358            )
359            .unwrap();
360
361        assert!(!new_region_routes[0].is_leader_downgrading());
362        assert!(new_region_routes[0].leader_down_since.is_none());
363        assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
364        assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
365    }
366
367    #[tokio::test]
368    async fn test_check_metadata() {
369        let state = UpdateMetadata::Upgrade;
370        let env = TestingEnv::new();
371        let persistent_context = new_persistent_context();
372        let leader_peer = persistent_context.from_peer.clone();
373
374        let mut ctx = env.context_factory().new_context(persistent_context);
375        let table_info = new_test_table_info(1024);
376        let region_routes = vec![RegionRoute {
377            region: Region::new_test(RegionId::new(1024, 1)),
378            leader_peer: Some(leader_peer),
379            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
380            leader_state: None,
381            leader_down_since: None,
382            write_route_policy: None,
383        }];
384
385        env.create_physical_table_metadata(table_info, region_routes)
386            .await;
387        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
388        let region_routes = table_routes.region_routes().unwrap();
389        let updated = state
390            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
391            .unwrap();
392        assert!(!updated);
393    }
394
395    #[tokio::test]
396    async fn test_check_metadata_updated() {
397        let state = UpdateMetadata::Upgrade;
398        let env = TestingEnv::new();
399        let persistent_context = new_persistent_context();
400        let candidate_peer = persistent_context.to_peer.clone();
401
402        let mut ctx = env.context_factory().new_context(persistent_context);
403        let table_info = new_test_table_info(1024);
404        let region_routes = vec![RegionRoute {
405            region: Region::new_test(RegionId::new(1024, 1)),
406            leader_peer: Some(candidate_peer),
407            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
408            leader_state: None,
409            leader_down_since: None,
410            write_route_policy: None,
411        }];
412
413        env.create_physical_table_metadata(table_info, region_routes)
414            .await;
415
416        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
417        let region_routes = table_routes.region_routes().unwrap();
418        let updated = state
419            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
420            .unwrap();
421        assert!(updated);
422    }
423
424    #[tokio::test]
425    async fn test_check_metadata_intermediate_state() {
426        let state = UpdateMetadata::Upgrade;
427        let env = TestingEnv::new();
428        let persistent_context = new_persistent_context();
429        let candidate_peer = persistent_context.to_peer.clone();
430
431        let mut ctx = env.context_factory().new_context(persistent_context);
432        let table_info = new_test_table_info(1024);
433        let region_routes = vec![RegionRoute {
434            region: Region::new_test(RegionId::new(1024, 1)),
435            leader_peer: Some(candidate_peer),
436            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
437            leader_state: Some(LeaderState::Downgrading),
438            leader_down_since: None,
439            write_route_policy: None,
440        }];
441
442        env.create_physical_table_metadata(table_info, region_routes)
443            .await;
444
445        let table_routes = ctx.get_table_route_value(1024).await.unwrap();
446        let region_routes = table_routes.region_routes().unwrap();
447        let err = state
448            .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
449            .unwrap_err();
450        assert_matches!(err, Error::Unexpected { .. });
451        assert!(err.to_string().contains("intermediate state"));
452    }
453
454    #[tokio::test]
455    async fn test_next_close_downgraded_region_state() {
456        let mut state = Box::new(UpdateMetadata::Upgrade);
457        let env = TestingEnv::new();
458        let persistent_context = new_persistent_context();
459        let mut ctx = env.context_factory().new_context(persistent_context);
460        let opening_keeper = MemoryRegionKeeper::default();
461
462        let table_id = 1024;
463        let table_info = new_test_table_info(table_id);
464        let region_routes = vec![RegionRoute {
465            region: Region::new_test(RegionId::new(table_id, 1)),
466            leader_peer: Some(Peer::empty(1)),
467            leader_state: Some(LeaderState::Downgrading),
468            ..Default::default()
469        }];
470
471        let guard = opening_keeper
472            .register_with_role(2, RegionId::new(table_id, 1), RegionRole::Follower)
473            .unwrap();
474        ctx.volatile_ctx.opening_region_guards.push(guard);
475
476        env.create_physical_table_metadata(table_info, region_routes)
477            .await;
478
479        let table_metadata_manager = env.table_metadata_manager();
480
481        let procedure_ctx = new_procedure_context();
482        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
483
484        let _ = next
485            .as_any()
486            .downcast_ref::<CloseDowngradedRegion>()
487            .unwrap();
488
489        let table_route = table_metadata_manager
490            .table_route_manager()
491            .table_route_storage()
492            .get(table_id)
493            .await
494            .unwrap()
495            .unwrap();
496        let region_routes = table_route.region_routes().unwrap();
497
498        assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
499        assert_eq!(region_routes.len(), 1);
500        assert!(!region_routes[0].is_leader_downgrading());
501        assert!(region_routes[0].follower_peers.is_empty());
502        assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
503    }
504}