meta_srv/procedure/repartition/
dispatch.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use serde::{Deserialize, Serialize};
19
20use crate::error::Result;
21use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
22use crate::procedure::repartition::group::RepartitionGroupProcedure;
23use crate::procedure::repartition::{self, Context, State};
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct Dispatch;
27
28#[async_trait::async_trait]
29#[typetag::serde]
30impl State for Dispatch {
31 async fn next(
32 &mut self,
33 ctx: &mut Context,
34 _procedure_ctx: &ProcedureContext,
35 ) -> Result<(Box<dyn State>, Status)> {
36 let table_id = ctx.persistent_ctx.table_id;
37 let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
38 let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
39 for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
40 let persistent_ctx = repartition::group::PersistentContext::new(
41 plan.group_id,
42 table_id,
43 plan.source_regions.clone(),
44 plan.target_regions.clone(),
45 );
46
47 let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
48 let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure));
49 procedure_metas.push(ProcedureMeta {
50 plan_index,
51 group_id: plan.group_id,
52 procedure_id: procedure.id,
53 });
54 procedures.push(procedure);
55 }
56
57 Ok((
58 Box::new(Collect::new(procedure_metas)),
59 Status::suspended(procedures, true),
60 ))
61 }
62
63 fn as_any(&self) -> &dyn Any {
64 self
65 }
66}