meta_srv/procedure/region_migration/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod downgrade_leader_region;
16pub(crate) mod rollback_downgraded_region;
17pub(crate) mod upgrade_candidate_region;
18
19use std::any::Any;
20
21use common_meta::lock_key::TableLock;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25
26use crate::error::Result;
27use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
28use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
29use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
30use crate::procedure::region_migration::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
33#[serde(tag = "UpdateMetadata")]
34pub enum UpdateMetadata {
35    /// Downgrades the leader region.
36    Downgrade,
37    /// Upgrades the candidate region.
38    Upgrade,
39    /// Rolls back the downgraded region.
40    Rollback,
41}
42
43#[async_trait::async_trait]
44#[typetag::serde]
45impl State for UpdateMetadata {
46    async fn next(
47        &mut self,
48        ctx: &mut Context,
49        procedure_ctx: &ProcedureContext,
50    ) -> Result<(Box<dyn State>, Status)> {
51        let table_id = TableLock::Write(ctx.region_id().table_id()).into();
52        let _guard = procedure_ctx.provider.acquire_lock(&table_id).await;
53
54        match self {
55            UpdateMetadata::Downgrade => {
56                self.downgrade_leader_region(ctx).await?;
57
58                Ok((
59                    Box::<DowngradeLeaderRegion>::default(),
60                    Status::executing(false),
61                ))
62            }
63            UpdateMetadata::Upgrade => {
64                self.upgrade_candidate_region(ctx).await?;
65
66                if let Err(err) = ctx.invalidate_table_cache().await {
67                    warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
68                };
69                Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
70            }
71            UpdateMetadata::Rollback => {
72                self.rollback_downgraded_region(ctx).await?;
73
74                if let Err(err) = ctx.invalidate_table_cache().await {
75                    warn!("Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}");
76                };
77                Ok((
78                    Box::new(RegionMigrationAbort::new(
79                        "Failed to upgrade the candidate region.",
80                    )),
81                    Status::executing(false),
82                ))
83            }
84        }
85    }
86
87    fn as_any(&self) -> &dyn Any {
88        self
89    }
90}