Skip to main content

meta_srv/procedure/repartition/group/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_region;
16pub(crate) mod exit_staging_region;
17
18use std::any::Any;
19use std::time::Instant;
20
21use common_meta::lock_key::TableLock;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25
26use crate::error::Result;
27use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
28use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
29use crate::procedure::repartition::group::{Context, State};
30
31#[derive(Debug, Serialize, Deserialize)]
32#[allow(clippy::enum_variant_names)]
33pub enum UpdateMetadata {
34    /// Applies the new partition expressions for staging regions.
35    ApplyStaging,
36    /// Exits the staging regions after the group finishes its forward path.
37    ExitStaging,
38}
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for UpdateMetadata {
43    async fn next(
44        &mut self,
45        ctx: &mut Context,
46        procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        let timer = Instant::now();
49        let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
50        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
51        match self {
52            UpdateMetadata::ApplyStaging => {
53                // TODO(weny): If all metadata have already been updated, skip applying staging regions.
54                self.apply_staging_regions(ctx).await?;
55
56                if let Err(err) = ctx.invalidate_table_cache().await {
57                    warn!(
58                        "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
59                    );
60                };
61                ctx.update_update_metadata_elapsed(timer.elapsed());
62                Ok((Box::new(EnterStagingRegion), Status::executing(false)))
63            }
64            UpdateMetadata::ExitStaging => {
65                self.exit_staging_regions(ctx).await?;
66                if let Err(err) = ctx.invalidate_table_cache().await {
67                    warn!(
68                        err;
69                        "Failed to broadcast the invalidate table cache message during the exit staging regions"
70                    );
71                };
72                ctx.update_update_metadata_elapsed(timer.elapsed());
73                Ok((Box::new(RepartitionEnd), Status::executing(false)))
74            }
75        }
76    }
77
78    fn as_any(&self) -> &dyn Any {
79        self
80    }
81}