meta_srv/procedure/region_migration/
update_metadata.rs1pub(crate) mod downgrade_leader_region;
16pub(crate) mod rollback_downgraded_region;
17pub(crate) mod upgrade_candidate_region;
18
19use std::any::Any;
20
21use common_meta::lock_key::TableLock;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25
26use crate::error::Result;
27use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
28use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
29use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
30use crate::procedure::region_migration::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
33#[serde(tag = "UpdateMetadata")]
34pub enum UpdateMetadata {
35 Downgrade,
37 Upgrade,
39 Rollback,
41}
42
43#[async_trait::async_trait]
44#[typetag::serde]
45impl State for UpdateMetadata {
46 async fn next(
47 &mut self,
48 ctx: &mut Context,
49 procedure_ctx: &ProcedureContext,
50 ) -> Result<(Box<dyn State>, Status)> {
51 let table_id = TableLock::Write(ctx.region_id().table_id()).into();
52 let _guard = procedure_ctx.provider.acquire_lock(&table_id).await;
53
54 match self {
55 UpdateMetadata::Downgrade => {
56 self.downgrade_leader_region(ctx).await?;
57
58 Ok((
59 Box::<DowngradeLeaderRegion>::default(),
60 Status::executing(false),
61 ))
62 }
63 UpdateMetadata::Upgrade => {
64 self.upgrade_candidate_region(ctx).await?;
65
66 if let Err(err) = ctx.invalidate_table_cache().await {
67 warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
68 };
69 Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
70 }
71 UpdateMetadata::Rollback => {
72 self.rollback_downgraded_region(ctx).await?;
73
74 if let Err(err) = ctx.invalidate_table_cache().await {
75 warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
76 };
77 Ok((
78 Box::new(RegionMigrationAbort::new(
79 "Failed to upgrade the candidate region.",
80 )),
81 Status::executing(false),
82 ))
83 }
84 }
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90}