meta_srv/procedure/repartition/
collect.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::error;
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30 pub plan_index: usize,
32 pub group_id: GroupId,
34 pub procedure_id: ProcedureId,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41 pub inflight_procedures: Vec<ProcedureMeta>,
43 pub succeeded_procedures: Vec<ProcedureMeta>,
45 pub failed_procedures: Vec<ProcedureMeta>,
47 pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52 pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53 Self {
54 inflight_procedures,
55 succeeded_procedures: Vec::new(),
56 failed_procedures: Vec::new(),
57 unknown_procedures: Vec::new(),
58 }
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65 async fn next(
66 &mut self,
67 _ctx: &mut Context,
68 procedure_ctx: &ProcedureContext,
69 ) -> Result<(Box<dyn State>, Status)> {
70 for procedure_meta in self.inflight_procedures.iter() {
71 let procedure_id = procedure_meta.procedure_id;
72 let group_id = procedure_meta.group_id;
73 let Some(mut receiver) = procedure_ctx
74 .provider
75 .procedure_state_receiver(procedure_id)
76 .await
77 .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
78 else {
79 error!(
80 "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
81 procedure_id, group_id
82 );
83 self.unknown_procedures.push(*procedure_meta);
84 continue;
85 };
86
87 match watcher::wait(&mut receiver).await {
88 Ok(_) => self.succeeded_procedures.push(*procedure_meta),
89 Err(e) => {
90 error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
91 self.failed_procedures.push(*procedure_meta);
92 }
93 }
94 }
95
96 if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
97 }
99
100 Ok((Box::new(DeallocateRegion), Status::executing(true)))
101 }
102
103 fn as_any(&self) -> &dyn Any {
104 self
105 }
106}