meta_srv/procedure/repartition/group/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_region;
16pub(crate) mod rollback_staging_region;
17
18use std::any::Any;
19
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23
24use crate::error::Result;
25use crate::procedure::repartition::group::repartition_start::RepartitionStart;
26use crate::procedure::repartition::group::{Context, State};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub enum UpdateMetadata {
30    /// Applies the new partition expressions for staging regions.
31    ApplyStaging,
32    /// Rolls back the new partition expressions for staging regions.
33    RollbackStaging,
34}
35
36impl UpdateMetadata {
37    #[allow(dead_code)]
38    fn next_state() -> (Box<dyn State>, Status) {
39        // TODO(weny): change it later.
40        (Box::new(RepartitionStart), Status::executing(true))
41    }
42}
43
44#[async_trait::async_trait]
45#[typetag::serde]
46impl State for UpdateMetadata {
47    async fn next(
48        &mut self,
49        ctx: &mut Context,
50        _procedure_ctx: &ProcedureContext,
51    ) -> Result<(Box<dyn State>, Status)> {
52        match self {
53            UpdateMetadata::ApplyStaging => {
54                // TODO(weny): If all metadata have already been updated, skip applying staging regions.
55                self.apply_staging_regions(ctx).await?;
56
57                if let Err(err) = ctx.invalidate_table_cache().await {
58                    warn!(
59                        "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
60                    );
61                };
62                Ok(Self::next_state())
63            }
64            UpdateMetadata::RollbackStaging => {
65                self.rollback_staging_regions(ctx).await?;
66
67                if let Err(err) = ctx.invalidate_table_cache().await {
68                    warn!(
69                        "Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
70                    );
71                };
72                Ok(Self::next_state())
73            }
74        }
75    }
76
77    fn as_any(&self) -> &dyn Any {
78        self
79    }
80}