meta_srv/procedure/repartition/group/
update_metadata.rs1pub(crate) mod apply_staging_region;
16pub(crate) mod exit_staging_region;
17
18use std::any::Any;
19use std::time::Instant;
20
21use common_meta::lock_key::TableLock;
22use common_procedure::{Context as ProcedureContext, Status};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25
26use crate::error::Result;
27use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
28use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
29use crate::procedure::repartition::group::{Context, State};
30
31#[derive(Debug, Serialize, Deserialize)]
32#[allow(clippy::enum_variant_names)]
33pub enum UpdateMetadata {
34 ApplyStaging,
36 ExitStaging,
38}
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for UpdateMetadata {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let timer = Instant::now();
49 let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
50 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
51 match self {
52 UpdateMetadata::ApplyStaging => {
53 self.apply_staging_regions(ctx).await?;
55
56 if let Err(err) = ctx.invalidate_table_cache().await {
57 warn!(
58 "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
59 );
60 };
61 ctx.update_update_metadata_elapsed(timer.elapsed());
62 Ok((Box::new(EnterStagingRegion), Status::executing(false)))
63 }
64 UpdateMetadata::ExitStaging => {
65 self.exit_staging_regions(ctx).await?;
66 if let Err(err) = ctx.invalidate_table_cache().await {
67 warn!(
68 err;
69 "Failed to broadcast the invalidate table cache message during the exit staging regions"
70 );
71 };
72 ctx.update_update_metadata_elapsed(timer.elapsed());
73 Ok((Box::new(RepartitionEnd), Status::executing(false)))
74 }
75 }
76 }
77
78 fn as_any(&self) -> &dyn Any {
79 self
80 }
81}