meta_srv/procedure/repartition/group/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_region;
16pub(crate) mod exit_staging_region;
17pub(crate) mod rollback_staging_region;
18
19use std::any::Any;
20use std::time::Instant;
21
22use common_meta::lock_key::TableLock;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::warn;
25use serde::{Deserialize, Serialize};
26
27use crate::error::Result;
28use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
29use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
30use crate::procedure::repartition::group::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
33#[allow(clippy::enum_variant_names)]
34pub enum UpdateMetadata {
35    /// Applies the new partition expressions for staging regions.
36    ApplyStaging,
37    /// Rolls back the new partition expressions for staging regions.
38    RollbackStaging,
39    /// Exits the staging regions.
40    ExitStaging,
41}
42
43#[async_trait::async_trait]
44#[typetag::serde]
45impl State for UpdateMetadata {
46    async fn next(
47        &mut self,
48        ctx: &mut Context,
49        procedure_ctx: &ProcedureContext,
50    ) -> Result<(Box<dyn State>, Status)> {
51        let timer = Instant::now();
52        let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
53        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
54        match self {
55            UpdateMetadata::ApplyStaging => {
56                // TODO(weny): If all metadata have already been updated, skip applying staging regions.
57                self.apply_staging_regions(ctx).await?;
58
59                if let Err(err) = ctx.invalidate_table_cache().await {
60                    warn!(
61                        "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
62                    );
63                };
64                ctx.update_update_metadata_elapsed(timer.elapsed());
65                Ok((Box::new(EnterStagingRegion), Status::executing(false)))
66            }
67            UpdateMetadata::RollbackStaging => {
68                self.rollback_staging_regions(ctx).await?;
69
70                if let Err(err) = ctx.invalidate_table_cache().await {
71                    warn!(
72                        err;
73                        "Failed to broadcast the invalidate table cache message during the rollback staging regions"
74                    );
75                };
76                Ok((Box::new(RepartitionEnd), Status::executing(false)))
77            }
78            UpdateMetadata::ExitStaging => {
79                self.exit_staging_regions(ctx).await?;
80                if let Err(err) = ctx.invalidate_table_cache().await {
81                    warn!(
82                        err;
83                        "Failed to broadcast the invalidate table cache message during the exit staging regions"
84                    );
85                };
86                ctx.update_update_metadata_elapsed(timer.elapsed());
87                Ok((Box::new(RepartitionEnd), Status::executing(false)))
88            }
89        }
90    }
91
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95}