meta_srv/procedure/region_migration/update_metadata/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::datanode_table::RegionInfo;
17use common_meta::rpc::router::{RegionRoute, region_distribution};
18use common_telemetry::{info, warn};
19use snafu::{OptionExt, ResultExt, ensure};
20
21use crate::error::{self, Result};
22use crate::procedure::region_migration::Context;
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24
25impl UpdateMetadata {
26    /// Returns new [Vec<RegionRoute>].
27    async fn build_upgrade_candidate_region_metadata(
28        &self,
29        ctx: &mut Context,
30    ) -> Result<Vec<RegionRoute>> {
31        let region_id = ctx.region_id();
32        let table_route_value = ctx.get_table_route_value().await?.clone();
33
34        let mut region_routes = table_route_value
35            .region_routes()
36            .context(error::UnexpectedLogicalRouteTableSnafu {
37                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
38            })?
39            .clone();
40        let region_route = region_routes
41            .iter_mut()
42            .find(|route| route.region.id == region_id)
43            .context(error::RegionRouteNotFoundSnafu { region_id })?;
44
45        // Removes downgraded status.
46        region_route.set_leader_state(None);
47
48        let candidate = &ctx.persistent_ctx.to_peer;
49        let expected_old_leader = &ctx.persistent_ctx.from_peer;
50
51        // Upgrades candidate to leader.
52        ensure!(
53            region_route
54                .leader_peer
55                .take_if(|old_leader| old_leader.id == expected_old_leader.id)
56                .is_some(),
57            error::UnexpectedSnafu {
58                violated: format!(
59                    "Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}",
60                    region_route.leader_peer, expected_old_leader
61                ),
62            }
63        );
64
65        region_route.leader_peer = Some(candidate.clone());
66        info!(
67            "Upgrading candidate region to leader region: {:?} for region: {}",
68            candidate, region_id
69        );
70
71        // Removes the candidate region in followers.
72        let removed = region_route
73            .follower_peers
74            .extract_if(.., |peer| peer.id == candidate.id)
75            .collect::<Vec<_>>();
76
77        if removed.len() > 1 {
78            warn!(
79                "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}"
80            );
81        }
82
83        Ok(region_routes)
84    }
85
86    /// Returns true if region metadata has been updated.
87    async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
88        let region_id = ctx.region_id();
89        let table_route_value = ctx.get_table_route_value().await?.clone();
90
91        let region_routes = table_route_value
92            .region_routes()
93            .context(error::UnexpectedLogicalRouteTableSnafu {
94                err_msg: format!("{self:?} is a non-physical TableRouteValue."),
95            })?
96            .clone();
97        let region_route = region_routes
98            .into_iter()
99            .find(|route| route.region.id == region_id)
100            .context(error::RegionRouteNotFoundSnafu { region_id })?;
101
102        let leader_peer = region_route
103            .leader_peer
104            .as_ref()
105            .context(error::UnexpectedSnafu {
106                violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
107            })?;
108
109        let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
110
111        if leader_peer.id == candidate_peer_id {
112            ensure!(
113                !region_route.is_leader_downgrading(),
114                error::UnexpectedSnafu {
115                    violated: format!(
116                        "Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"
117                    ),
118                }
119            );
120
121            Ok(true)
122        } else {
123            Ok(false)
124        }
125    }
126
127    /// Upgrades the candidate region.
128    ///
129    /// Abort(non-retry):
130    /// - TableRoute or RegionRoute is not found.
131    ///   Typically, it's impossible, there is no other DDL procedure executed concurrently for the current table.
132    ///
133    /// Retry:
134    /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
135    /// - Failed to retrieve the metadata of table.
136    pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> {
137        let region_id = ctx.region_id();
138        let table_metadata_manager = ctx.table_metadata_manager.clone();
139
140        if self.check_metadata_updated(ctx).await? {
141            return Ok(());
142        }
143
144        let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
145        let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
146        let RegionInfo {
147            region_storage_path,
148            region_options,
149            region_wal_options,
150            engine,
151        } = datanode_table_value.region_info.clone();
152        let table_route_value = ctx.get_table_route_value().await?;
153
154        let region_distribution = region_distribution(&region_routes);
155        info!(
156            "Trying to update region routes to {:?} for table: {}",
157            region_distribution,
158            region_id.table_id()
159        );
160        if let Err(err) = table_metadata_manager
161            .update_table_route(
162                region_id.table_id(),
163                RegionInfo {
164                    engine: engine.to_string(),
165                    region_storage_path: region_storage_path.to_string(),
166                    region_options: region_options.clone(),
167                    region_wal_options: region_wal_options.clone(),
168                },
169                table_route_value,
170                region_routes,
171                &region_options,
172                &region_wal_options,
173            )
174            .await
175            .context(error::TableMetadataManagerSnafu)
176        {
177            ctx.remove_table_route_value();
178            return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
179                reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
180            });
181        };
182
183        ctx.remove_table_route_value();
184        ctx.deregister_failure_detectors().await;
185        // Consumes the guard.
186        ctx.volatile_ctx.opening_region_guard.take();
187
188        Ok(())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use std::assert_matches::assert_matches;
195
196    use common_meta::key::test_utils::new_test_table_info;
197    use common_meta::peer::Peer;
198    use common_meta::region_keeper::MemoryRegionKeeper;
199    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
200    use common_time::util::current_time_millis;
201    use store_api::storage::RegionId;
202
203    use crate::error::Error;
204    use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
205    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
206    use crate::procedure::region_migration::update_metadata::UpdateMetadata;
207    use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
208
209    fn new_persistent_context() -> PersistentContext {
210        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
211    }
212
213    #[tokio::test]
214    async fn test_table_route_is_not_found_error() {
215        let state = UpdateMetadata::Upgrade;
216
217        let env = TestingEnv::new();
218        let persistent_context = new_persistent_context();
219        let mut ctx = env.context_factory().new_context(persistent_context);
220
221        let err = state
222            .build_upgrade_candidate_region_metadata(&mut ctx)
223            .await
224            .unwrap_err();
225
226        assert_matches!(err, Error::TableRouteNotFound { .. });
227        assert!(!err.is_retryable());
228    }
229
230    #[tokio::test]
231    async fn test_region_route_is_not_found() {
232        let state = UpdateMetadata::Upgrade;
233        let env = TestingEnv::new();
234        let persistent_context = new_persistent_context();
235        let mut ctx = env.context_factory().new_context(persistent_context);
236
237        let table_info = new_test_table_info(1024, vec![2]).into();
238        let region_routes = vec![RegionRoute {
239            region: Region::new_test(RegionId::new(1024, 2)),
240            leader_peer: Some(Peer::empty(4)),
241            ..Default::default()
242        }];
243
244        env.create_physical_table_metadata(table_info, region_routes)
245            .await;
246
247        let err = state
248            .build_upgrade_candidate_region_metadata(&mut ctx)
249            .await
250            .unwrap_err();
251
252        assert_matches!(err, Error::RegionRouteNotFound { .. });
253        assert!(!err.is_retryable());
254    }
255
256    #[tokio::test]
257    async fn test_region_route_expected_leader() {
258        let state = UpdateMetadata::Upgrade;
259        let env = TestingEnv::new();
260        let persistent_context = new_persistent_context();
261        let mut ctx = env.context_factory().new_context(persistent_context);
262
263        let table_info = new_test_table_info(1024, vec![1]).into();
264        let region_routes = vec![RegionRoute {
265            region: Region::new_test(RegionId::new(1024, 1)),
266            leader_peer: Some(Peer::empty(3)),
267            ..Default::default()
268        }];
269
270        env.create_physical_table_metadata(table_info, region_routes)
271            .await;
272
273        let err = state
274            .build_upgrade_candidate_region_metadata(&mut ctx)
275            .await
276            .unwrap_err();
277
278        assert_matches!(err, Error::Unexpected { .. });
279        assert!(!err.is_retryable());
280        assert!(err.to_string().contains("Unexpected region leader"));
281    }
282
283    #[tokio::test]
284    async fn test_build_upgrade_candidate_region_metadata() {
285        let state = UpdateMetadata::Upgrade;
286        let env = TestingEnv::new();
287        let persistent_context = new_persistent_context();
288        let mut ctx = env.context_factory().new_context(persistent_context);
289
290        let table_info = new_test_table_info(1024, vec![1]).into();
291        let region_routes = vec![RegionRoute {
292            region: Region::new_test(RegionId::new(1024, 1)),
293            leader_peer: Some(Peer::empty(1)),
294            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
295            leader_state: Some(LeaderState::Downgrading),
296            leader_down_since: Some(current_time_millis()),
297        }];
298
299        env.create_physical_table_metadata(table_info, region_routes)
300            .await;
301
302        let new_region_routes = state
303            .build_upgrade_candidate_region_metadata(&mut ctx)
304            .await
305            .unwrap();
306
307        assert!(!new_region_routes[0].is_leader_downgrading());
308        assert!(new_region_routes[0].leader_down_since.is_none());
309        assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]);
310        assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2);
311    }
312
313    #[tokio::test]
314    async fn test_failed_to_update_table_route_error() {
315        let state = UpdateMetadata::Upgrade;
316        let env = TestingEnv::new();
317        let persistent_context = new_persistent_context();
318        let mut ctx = env.context_factory().new_context(persistent_context);
319        let opening_keeper = MemoryRegionKeeper::default();
320
321        let table_id = 1024;
322        let table_info = new_test_table_info(table_id, vec![1]).into();
323        let region_routes = vec![
324            RegionRoute {
325                region: Region::new_test(RegionId::new(table_id, 1)),
326                leader_peer: Some(Peer::empty(1)),
327                follower_peers: vec![Peer::empty(5), Peer::empty(3)],
328                leader_state: Some(LeaderState::Downgrading),
329                leader_down_since: Some(current_time_millis()),
330            },
331            RegionRoute {
332                region: Region::new_test(RegionId::new(table_id, 2)),
333                leader_peer: Some(Peer::empty(4)),
334                leader_state: Some(LeaderState::Downgrading),
335                ..Default::default()
336            },
337        ];
338
339        env.create_physical_table_metadata(table_info, region_routes)
340            .await;
341
342        let table_metadata_manager = env.table_metadata_manager();
343        let original_table_route = table_metadata_manager
344            .table_route_manager()
345            .table_route_storage()
346            .get_with_raw_bytes(table_id)
347            .await
348            .unwrap()
349            .unwrap();
350
351        // modifies the table route.
352        table_metadata_manager
353            .update_leader_region_status(table_id, &original_table_route, |route| {
354                if route.region.id == RegionId::new(1024, 2) {
355                    // Removes the status.
356                    Some(None)
357                } else {
358                    None
359                }
360            })
361            .await
362            .unwrap();
363
364        // sets the old table route.
365        ctx.volatile_ctx.table_route = Some(original_table_route);
366        let guard = opening_keeper
367            .register(2, RegionId::new(table_id, 1))
368            .unwrap();
369        ctx.volatile_ctx.opening_region_guard = Some(guard);
370        let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err();
371
372        assert!(ctx.volatile_ctx.table_route.is_none());
373        assert!(ctx.volatile_ctx.opening_region_guard.is_some());
374        assert!(err.is_retryable());
375        assert!(format!("{err:?}").contains("Failed to update the table route"));
376    }
377
378    #[tokio::test]
379    async fn test_check_metadata() {
380        let state = UpdateMetadata::Upgrade;
381        let env = TestingEnv::new();
382        let persistent_context = new_persistent_context();
383        let leader_peer = persistent_context.from_peer.clone();
384
385        let mut ctx = env.context_factory().new_context(persistent_context);
386        let table_info = new_test_table_info(1024, vec![1]).into();
387        let region_routes = vec![RegionRoute {
388            region: Region::new_test(RegionId::new(1024, 1)),
389            leader_peer: Some(leader_peer),
390            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
391            leader_state: None,
392            leader_down_since: None,
393        }];
394
395        env.create_physical_table_metadata(table_info, region_routes)
396            .await;
397
398        let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
399        assert!(!updated);
400    }
401
402    #[tokio::test]
403    async fn test_check_metadata_updated() {
404        let state = UpdateMetadata::Upgrade;
405        let env = TestingEnv::new();
406        let persistent_context = new_persistent_context();
407        let candidate_peer = persistent_context.to_peer.clone();
408
409        let mut ctx = env.context_factory().new_context(persistent_context);
410        let table_info = new_test_table_info(1024, vec![1]).into();
411        let region_routes = vec![RegionRoute {
412            region: Region::new_test(RegionId::new(1024, 1)),
413            leader_peer: Some(candidate_peer),
414            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
415            leader_state: None,
416            leader_down_since: None,
417        }];
418
419        env.create_physical_table_metadata(table_info, region_routes)
420            .await;
421
422        let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
423        assert!(updated);
424    }
425
426    #[tokio::test]
427    async fn test_check_metadata_intermediate_state() {
428        let state = UpdateMetadata::Upgrade;
429        let env = TestingEnv::new();
430        let persistent_context = new_persistent_context();
431        let candidate_peer = persistent_context.to_peer.clone();
432
433        let mut ctx = env.context_factory().new_context(persistent_context);
434        let table_info = new_test_table_info(1024, vec![1]).into();
435        let region_routes = vec![RegionRoute {
436            region: Region::new_test(RegionId::new(1024, 1)),
437            leader_peer: Some(candidate_peer),
438            follower_peers: vec![Peer::empty(2), Peer::empty(3)],
439            leader_state: Some(LeaderState::Downgrading),
440            leader_down_since: None,
441        }];
442
443        env.create_physical_table_metadata(table_info, region_routes)
444            .await;
445
446        let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
447        assert_matches!(err, Error::Unexpected { .. });
448        assert!(err.to_string().contains("intermediate state"));
449    }
450
451    #[tokio::test]
452    async fn test_next_close_downgraded_region_state() {
453        let mut state = Box::new(UpdateMetadata::Upgrade);
454        let env = TestingEnv::new();
455        let persistent_context = new_persistent_context();
456        let mut ctx = env.context_factory().new_context(persistent_context);
457        let opening_keeper = MemoryRegionKeeper::default();
458
459        let table_id = 1024;
460        let table_info = new_test_table_info(table_id, vec![1]).into();
461        let region_routes = vec![RegionRoute {
462            region: Region::new_test(RegionId::new(table_id, 1)),
463            leader_peer: Some(Peer::empty(1)),
464            leader_state: Some(LeaderState::Downgrading),
465            ..Default::default()
466        }];
467
468        let guard = opening_keeper
469            .register(2, RegionId::new(table_id, 1))
470            .unwrap();
471        ctx.volatile_ctx.opening_region_guard = Some(guard);
472
473        env.create_physical_table_metadata(table_info, region_routes)
474            .await;
475
476        let table_metadata_manager = env.table_metadata_manager();
477
478        let procedure_ctx = new_procedure_context();
479        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
480
481        let _ = next
482            .as_any()
483            .downcast_ref::<CloseDowngradedRegion>()
484            .unwrap();
485
486        let table_route = table_metadata_manager
487            .table_route_manager()
488            .table_route_storage()
489            .get(table_id)
490            .await
491            .unwrap()
492            .unwrap();
493        let region_routes = table_route.region_routes().unwrap();
494
495        assert!(ctx.volatile_ctx.table_route.is_none());
496        assert!(ctx.volatile_ctx.opening_region_guard.is_none());
497        assert_eq!(region_routes.len(), 1);
498        assert!(!region_routes[0].is_leader_downgrading());
499        assert!(region_routes[0].follower_peers.is_empty());
500        assert_eq!(region_routes[0].leader_peer.as_ref().unwrap().id, 2);
501    }
502}