meta_srv/procedure/repartition/
collect.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18use common_telemetry::{error, info};
19use serde::{Deserialize, Serialize};
20use snafu::ResultExt;
21
22use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24use crate::procedure::repartition::group::GroupId;
25use crate::procedure::repartition::{Context, State};
26
27#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29pub struct ProcedureMeta {
30 pub plan_index: usize,
32 pub group_id: GroupId,
34 pub procedure_id: ProcedureId,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Collect {
41 pub inflight_procedures: Vec<ProcedureMeta>,
43 pub succeeded_procedures: Vec<ProcedureMeta>,
45 pub failed_procedures: Vec<ProcedureMeta>,
47 pub unknown_procedures: Vec<ProcedureMeta>,
49}
50
51impl Collect {
52 pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53 Self {
54 inflight_procedures,
55 succeeded_procedures: Vec::new(),
56 failed_procedures: Vec::new(),
57 unknown_procedures: Vec::new(),
58 }
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for Collect {
65 async fn next(
66 &mut self,
67 ctx: &mut Context,
68 procedure_ctx: &ProcedureContext,
69 ) -> Result<(Box<dyn State>, Status)> {
70 let table_id = ctx.persistent_ctx.table_id;
71 for procedure_meta in self.inflight_procedures.iter() {
72 let procedure_id = procedure_meta.procedure_id;
73 let group_id = procedure_meta.group_id;
74 let Some(mut receiver) = procedure_ctx
75 .provider
76 .procedure_state_receiver(procedure_id)
77 .await
78 .context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
79 else {
80 error!(
81 "failed to get procedure state receiver, procedure_id: {}, group_id: {}",
82 procedure_id, group_id
83 );
84 self.unknown_procedures.push(*procedure_meta);
85 continue;
86 };
87
88 match watcher::wait(&mut receiver).await {
89 Ok(_) => self.succeeded_procedures.push(*procedure_meta),
90 Err(e) => {
91 error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
92 self.failed_procedures.push(*procedure_meta);
93 }
94 }
95 }
96
97 let succeeded = self.succeeded_procedures.len();
98 let failed = self.failed_procedures.len();
99 let unknown = self.unknown_procedures.len();
100 info!(
101 "Collected repartition group results for table_id: {}, succeeded: {}, failed: {}, unknown: {}",
102 table_id, succeeded, failed, unknown
103 );
104
105 if failed > 0 || unknown > 0 {
106 ctx.persistent_ctx
107 .failed_procedures
108 .extend(self.failed_procedures.iter());
109 ctx.persistent_ctx
110 .unknown_procedures
111 .extend(self.unknown_procedures.iter());
112 return crate::error::UnexpectedSnafu {
113 violated: format!(
114 "Repartition groups failed or became unknown, table_id: {}, failed: {}, unknown: {}",
115 table_id, failed, unknown
116 ),
117 }
118 .fail();
119 }
120
121 if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() {
122 ctx.update_finish_groups_elapsed(start_time.elapsed());
123 }
124
125 Ok((Box::new(DeallocateRegion), Status::executing(true)))
126 }
127
128 fn as_any(&self) -> &dyn Any {
129 self
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use std::sync::Arc;
136
137 use common_error::mock::MockError;
138 use common_error::status_code::StatusCode;
139 use common_meta::test_util::MockDatanodeManager;
140 use common_procedure::{
141 Context as ProcedureContext, ContextProvider, Error as ProcedureError, ProcedureId,
142 ProcedureState,
143 };
144 use common_procedure_test::MockContextProvider;
145 use tokio::sync::watch;
146
147 use super::*;
148 use crate::procedure::repartition::PersistentContext;
149 use crate::procedure::repartition::test_util::TestingEnv;
150
151 struct FailedProcedureContextProvider {
152 receiver: watch::Receiver<ProcedureState>,
153 inner: MockContextProvider,
154 }
155
156 #[async_trait::async_trait]
157 impl ContextProvider for FailedProcedureContextProvider {
158 async fn procedure_state(
159 &self,
160 procedure_id: ProcedureId,
161 ) -> common_procedure::Result<Option<ProcedureState>> {
162 self.inner.procedure_state(procedure_id).await
163 }
164
165 async fn procedure_state_receiver(
166 &self,
167 _procedure_id: ProcedureId,
168 ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
169 Ok(Some(self.receiver.clone()))
170 }
171
172 async fn try_put_poison(
173 &self,
174 key: &common_procedure::PoisonKey,
175 procedure_id: ProcedureId,
176 ) -> common_procedure::Result<()> {
177 self.inner.try_put_poison(key, procedure_id).await
178 }
179
180 async fn acquire_lock(
181 &self,
182 key: &common_procedure::StringKey,
183 ) -> common_procedure::local::DynamicKeyLockGuard {
184 self.inner.acquire_lock(key).await
185 }
186 }
187
188 #[tokio::test]
189 async fn test_collect_returns_error_when_unknown_exists() {
190 let env = TestingEnv::new();
191 let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
192 let persistent_ctx = PersistentContext::new(
193 table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
194 1024,
195 None,
196 );
197 let mut ctx = crate::procedure::repartition::Context::new(
198 &ddl_ctx,
199 env.mailbox_ctx.mailbox().clone(),
200 env.server_addr.clone(),
201 persistent_ctx,
202 );
203 let mut state = Collect {
204 inflight_procedures: vec![],
205 succeeded_procedures: vec![],
206 failed_procedures: vec![],
207 unknown_procedures: vec![ProcedureMeta {
208 plan_index: 0,
209 group_id: uuid::Uuid::new_v4(),
210 procedure_id: common_procedure::ProcedureId::random(),
211 }],
212 };
213
214 let err = state
215 .next(&mut ctx, &TestingEnv::procedure_context())
216 .await
217 .unwrap_err();
218
219 assert!(!err.is_retryable());
220 }
221
222 #[tokio::test]
223 async fn test_collect_returns_error_when_failed_exists() {
224 let env = TestingEnv::new();
225 let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
226 let persistent_ctx = PersistentContext::new(
227 table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
228 1024,
229 None,
230 );
231 let mut ctx = crate::procedure::repartition::Context::new(
232 &ddl_ctx,
233 env.mailbox_ctx.mailbox().clone(),
234 env.server_addr.clone(),
235 persistent_ctx,
236 );
237 let procedure_id = common_procedure::ProcedureId::random();
238 let (tx, rx) = watch::channel(ProcedureState::Running);
239 tx.send(ProcedureState::failed(Arc::new(ProcedureError::external(
240 MockError::new(StatusCode::Internal),
241 ))))
242 .unwrap();
243 let procedure_ctx = ProcedureContext {
244 procedure_id: ProcedureId::random(),
245 provider: Arc::new(FailedProcedureContextProvider {
246 receiver: rx,
247 inner: MockContextProvider::default(),
248 }),
249 };
250 let mut state = Collect {
251 inflight_procedures: vec![ProcedureMeta {
252 plan_index: 0,
253 group_id: uuid::Uuid::new_v4(),
254 procedure_id,
255 }],
256 succeeded_procedures: vec![],
257 failed_procedures: vec![],
258 unknown_procedures: vec![],
259 };
260
261 let err = state.next(&mut ctx, &procedure_ctx).await.unwrap_err();
262
263 assert_eq!(state.failed_procedures.len(), 1);
264 assert_eq!(state.unknown_procedures.len(), 0);
265 assert!(!err.is_retryable());
266 }
267}