meta_srv/procedure/region_migration/
update_metadata.rs1pub(crate) mod downgrade_leader_region;
16pub(crate) mod rollback_downgraded_region;
17pub(crate) mod upgrade_candidate_region;
18
19use std::any::Any;
20
21use common_meta::lock_key::TableLock;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25
26use crate::error::Result;
27use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
28use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
29use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
30use crate::procedure::region_migration::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
33#[serde(tag = "UpdateMetadata")]
34pub enum UpdateMetadata {
35 Downgrade,
37 Upgrade,
39 Rollback,
41}
42
43#[async_trait::async_trait]
44#[typetag::serde]
45impl State for UpdateMetadata {
46 async fn next(
47 &mut self,
48 ctx: &mut Context,
49 procedure_ctx: &ProcedureContext,
50 ) -> Result<(Box<dyn State>, Status)> {
51 let table_id = TableLock::Write(ctx.region_id().table_id()).into();
52 let _guard = procedure_ctx.provider.acquire_lock(&table_id).await;
53
54 match self {
55 UpdateMetadata::Downgrade => {
56 self.downgrade_leader_region(ctx).await?;
57
58 Ok((
59 Box::<DowngradeLeaderRegion>::default(),
60 Status::executing(false),
61 ))
62 }
63 UpdateMetadata::Upgrade => {
64 self.upgrade_candidate_region(ctx).await?;
65
66 if let Err(err) = ctx.invalidate_table_cache().await {
67 warn!(
68 "Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"
69 );
70 };
71 Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
72 }
73 UpdateMetadata::Rollback => {
74 self.rollback_downgraded_region(ctx).await?;
75
76 if let Err(err) = ctx.invalidate_table_cache().await {
77 warn!(
78 "Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}"
79 );
80 };
81 Ok((
82 Box::new(RegionMigrationAbort::new(
83 "Failed to upgrade the candidate region.",
84 )),
85 Status::executing(false),
86 ))
87 }
88 }
89 }
90
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94}