meta_srv/procedure/repartition/group/
update_metadata.rs1pub(crate) mod apply_staging_region;
16pub(crate) mod exit_staging_region;
17pub(crate) mod rollback_staging_region;
18
19use std::any::Any;
20use std::time::Instant;
21
22use common_meta::lock_key::TableLock;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::warn;
25use serde::{Deserialize, Serialize};
26
27use crate::error::Result;
28use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
29use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
30use crate::procedure::repartition::group::{Context, State};
31
32#[derive(Debug, Serialize, Deserialize)]
33#[allow(clippy::enum_variant_names)]
34pub enum UpdateMetadata {
35 ApplyStaging,
37 RollbackStaging,
39 ExitStaging,
41}
42
43#[async_trait::async_trait]
44#[typetag::serde]
45impl State for UpdateMetadata {
46 async fn next(
47 &mut self,
48 ctx: &mut Context,
49 procedure_ctx: &ProcedureContext,
50 ) -> Result<(Box<dyn State>, Status)> {
51 let timer = Instant::now();
52 let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
53 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
54 match self {
55 UpdateMetadata::ApplyStaging => {
56 self.apply_staging_regions(ctx).await?;
58
59 if let Err(err) = ctx.invalidate_table_cache().await {
60 warn!(
61 "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
62 );
63 };
64 ctx.update_update_metadata_elapsed(timer.elapsed());
65 Ok((Box::new(EnterStagingRegion), Status::executing(false)))
66 }
67 UpdateMetadata::RollbackStaging => {
68 self.rollback_staging_regions(ctx).await?;
69
70 if let Err(err) = ctx.invalidate_table_cache().await {
71 warn!(
72 err;
73 "Failed to broadcast the invalidate table cache message during the rollback staging regions"
74 );
75 };
76 Ok((Box::new(RepartitionEnd), Status::executing(false)))
77 }
78 UpdateMetadata::ExitStaging => {
79 self.exit_staging_regions(ctx).await?;
80 if let Err(err) = ctx.invalidate_table_cache().await {
81 warn!(
82 err;
83 "Failed to broadcast the invalidate table cache message during the exit staging regions"
84 );
85 };
86 ctx.update_update_metadata_elapsed(timer.elapsed());
87 Ok((Box::new(RepartitionEnd), Status::executing(false)))
88 }
89 }
90 }
91
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95}