meta_srv/procedure/repartition/group/
update_metadata.rs1pub(crate) mod apply_staging_region;
16pub(crate) mod rollback_staging_region;
17
18use std::any::Any;
19
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23
24use crate::error::Result;
25use crate::procedure::repartition::group::repartition_start::RepartitionStart;
26use crate::procedure::repartition::group::{Context, State};
27
28#[derive(Debug, Serialize, Deserialize)]
29pub enum UpdateMetadata {
30 ApplyStaging,
32 RollbackStaging,
34}
35
36impl UpdateMetadata {
37 #[allow(dead_code)]
38 fn next_state() -> (Box<dyn State>, Status) {
39 (Box::new(RepartitionStart), Status::executing(true))
41 }
42}
43
44#[async_trait::async_trait]
45#[typetag::serde]
46impl State for UpdateMetadata {
47 async fn next(
48 &mut self,
49 ctx: &mut Context,
50 _procedure_ctx: &ProcedureContext,
51 ) -> Result<(Box<dyn State>, Status)> {
52 match self {
53 UpdateMetadata::ApplyStaging => {
54 self.apply_staging_regions(ctx).await?;
56
57 if let Err(err) = ctx.invalidate_table_cache().await {
58 warn!(
59 "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
60 );
61 };
62 Ok(Self::next_state())
63 }
64 UpdateMetadata::RollbackStaging => {
65 self.rollback_staging_regions(ctx).await?;
66
67 if let Err(err) = ctx.invalidate_table_cache().await {
68 warn!(
69 "Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
70 );
71 };
72 Ok(Self::next_state())
73 }
74 }
75 }
76
77 fn as_any(&self) -> &dyn Any {
78 self
79 }
80}