meta_srv/procedure/repartition/
collect.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::{error, info};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30 pub plan_index: usize,
32 pub group_id: GroupId,
34 pub procedure_id: ProcedureId,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41 pub inflight_procedures: Vec<ProcedureMeta>,
43 pub succeeded_procedures: Vec<ProcedureMeta>,
45 pub failed_procedures: Vec<ProcedureMeta>,
47 pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52 pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53 Self {
54 inflight_procedures,
55 succeeded_procedures: Vec::new(),
56 failed_procedures: Vec::new(),
57 unknown_procedures: Vec::new(),
58 }
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65 async fn next(
66 &mut self,
67 ctx: &mut Context,
68 procedure_ctx: &ProcedureContext,
69 ) -> Result<(Box<dyn State>, Status)> {
70 let table_id = ctx.persistent_ctx.table_id;
71 for procedure_meta in self.inflight_procedures.iter() {
72 let procedure_id = procedure_meta.procedure_id;
73 let group_id = procedure_meta.group_id;
74 let Some(mut receiver) = procedure_ctx
75 .provider
76 .procedure_state_receiver(procedure_id)
77 .await
78 .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
79 else {
80 error!(
81 "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
82 procedure_id, group_id
83 );
84 self.unknown_procedures.push(*procedure_meta);
85 continue;
86 };
87
88 match watcher::wait(&mut receiver).await {
89 Ok(_) => self.succeeded_procedures.push(*procedure_meta),
90 Err(e) => {
91 error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
92 self.failed_procedures.push(*procedure_meta);
93 }
94 }
95 }
96
97 let inflight = self.inflight_procedures.len();
98 let succeeded = self.succeeded_procedures.len();
99 let failed = self.failed_procedures.len();
100 let unknown = self.unknown_procedures.len();
101 info!(
102 "Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
103 table_id, inflight, succeeded, failed, unknown
104 );
105
106 if failed > 0 || unknown > 0 {
107 }
109
110 if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() {
111 ctx.update_finish_groups_elapsed(start_time.elapsed());
112 }
113
114 Ok((Box::new(DeallocateRegion), Status::executing(true)))
115 }
116
117 fn as_any(&self) -> &dyn Any {
118 self
119 }
120}