meta_srv/procedure/region_migration/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod downgrade_leader_region;
16pub(crate) mod rollback_downgraded_region;
17pub(crate) mod upgrade_candidate_region;
18
19use std::any::Any;
20
21use common_procedure::{Context as ProcedureContext, Status};
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24
25use crate::error::Result;
26use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
27use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
28use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
29use crate::procedure::region_migration::{Context, State};
30
31#[derive(Debug, Serialize, Deserialize)]
32#[serde(tag = "UpdateMetadata")]
33pub enum UpdateMetadata {
34    /// Downgrades the leader region.
35    Downgrade,
36    /// Upgrades the candidate region.
37    Upgrade,
38    /// Rolls back the downgraded region.
39    Rollback,
40}
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for UpdateMetadata {
45    async fn next(
46        &mut self,
47        ctx: &mut Context,
48        procedure_ctx: &ProcedureContext,
49    ) -> Result<(Box<dyn State>, Status)> {
50        match self {
51            UpdateMetadata::Downgrade => {
52                self.downgrade_leader_region(ctx, &procedure_ctx.provider)
53                    .await?;
54
55                Ok((
56                    Box::<DowngradeLeaderRegion>::default(),
57                    Status::executing(false),
58                ))
59            }
60            UpdateMetadata::Upgrade => {
61                self.upgrade_candidate_region(ctx, &procedure_ctx.provider)
62                    .await?;
63
64                if let Err(err) = ctx.invalidate_table_cache().await {
65                    warn!(
66                        "Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}"
67                    );
68                };
69                Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
70            }
71            UpdateMetadata::Rollback => {
72                self.rollback_downgraded_region(ctx, &procedure_ctx.provider)
73                    .await?;
74
75                if let Err(err) = ctx.invalidate_table_cache().await {
76                    warn!(
77                        "Failed to broadcast the invalidate table cache message during the rollback, error: {err:?}"
78                    );
79                };
80                Ok((
81                    Box::new(RegionMigrationAbort::new(
82                        "Failed to upgrade the candidate region.",
83                    )),
84                    Status::executing(false),
85                ))
86            }
87        }
88    }
89
90    fn as_any(&self) -> &dyn Any {
91        self
92    }
93}