1use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31 #[snafu(display("Failed to check procedure manager status"))]
32 CheckStatus {
33 source: BoxedError,
34 #[snafu(implicit)]
35 location: Location,
36 },
37
38 #[snafu(display("Manager is pasued"))]
39 ManagerPasued {
40 #[snafu(implicit)]
41 location: Location,
42 },
43
44 #[snafu(display(
45 "Failed to execute procedure due to external error, clean poisons: {}",
46 clean_poisons
47 ))]
48 External {
49 source: BoxedError,
50 clean_poisons: bool,
51 },
52
53 #[snafu(display("Loader {} is already registered", name))]
54 LoaderConflict {
55 name: String,
56 #[snafu(implicit)]
57 location: Location,
58 },
59
60 #[snafu(display("Procedure Manager is stopped"))]
61 ManagerNotStart {
62 #[snafu(implicit)]
63 location: Location,
64 },
65
66 #[snafu(display("Failed to serialize to json"))]
67 ToJson {
68 #[snafu(source)]
69 error: serde_json::Error,
70 #[snafu(implicit)]
71 location: Location,
72 },
73
74 #[snafu(display("Procedure {} already exists", procedure_id))]
75 DuplicateProcedure {
76 procedure_id: ProcedureId,
77 #[snafu(implicit)]
78 location: Location,
79 },
80
81 #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
82 TooManyRunningProcedures {
83 max_running_procedures: usize,
84 #[snafu(implicit)]
85 location: Location,
86 },
87
88 #[snafu(display("Failed to put state, key: '{key}'"))]
89 PutState {
90 key: String,
91 #[snafu(implicit)]
92 location: Location,
93 source: BoxedError,
94 },
95
96 #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
97 PutPoison {
98 key: String,
99 token: String,
100 #[snafu(implicit)]
101 location: Location,
102 source: BoxedError,
103 },
104
105 #[snafu(display("Failed to get poison, key: '{key}'"))]
106 GetPoison {
107 key: String,
108 #[snafu(implicit)]
109 location: Location,
110 source: BoxedError,
111 },
112
113 #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
114 DeletePoison {
115 key: String,
116 token: String,
117 #[snafu(implicit)]
118 location: Location,
119 source: BoxedError,
120 },
121
122 #[snafu(display("Failed to delete {}", key))]
123 DeleteState {
124 key: String,
125 #[snafu(source)]
126 error: object_store::Error,
127 },
128
129 #[snafu(display("Failed to delete keys: '{keys}'"))]
130 DeleteStates {
131 keys: String,
132 #[snafu(implicit)]
133 location: Location,
134 source: BoxedError,
135 },
136
137 #[snafu(display("Failed to list state, path: '{path}'"))]
138 ListState {
139 path: String,
140 #[snafu(implicit)]
141 location: Location,
142 source: BoxedError,
143 },
144
145 #[snafu(display("Failed to deserialize from json"))]
146 FromJson {
147 #[snafu(source)]
148 error: serde_json::Error,
149 #[snafu(implicit)]
150 location: Location,
151 },
152
153 #[snafu(display("Procedure exec failed"))]
154 RetryLater {
155 source: BoxedError,
156 clean_poisons: bool,
157 },
158
159 #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
160 ProcedurePanic { procedure_id: ProcedureId },
161
162 #[snafu(display("Failed to wait watcher"))]
163 WaitWatcher {
164 #[snafu(source)]
165 error: tokio::sync::watch::error::RecvError,
166 #[snafu(implicit)]
167 location: Location,
168 },
169
170 #[snafu(display("Failed to execute procedure"))]
171 ProcedureExec {
172 source: Arc<Error>,
173 #[snafu(implicit)]
174 location: Location,
175 },
176
177 #[snafu(display("Rollback Procedure recovered: {error}"))]
178 RollbackProcedureRecovered {
179 error: String,
180 #[snafu(implicit)]
181 location: Location,
182 },
183
184 #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
185 RetryTimesExceeded {
186 source: Arc<Error>,
187 procedure_id: ProcedureId,
188 },
189
190 #[snafu(display(
191 "Procedure rollback exceeded max times, procedure_id: {}",
192 procedure_id
193 ))]
194 RollbackTimesExceeded {
195 source: Arc<Error>,
196 procedure_id: ProcedureId,
197 },
198
199 #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
200 StartRemoveOutdatedMetaTask {
201 source: common_runtime::error::Error,
202 #[snafu(implicit)]
203 location: Location,
204 },
205
206 #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
207 StopRemoveOutdatedMetaTask {
208 source: common_runtime::error::Error,
209 #[snafu(implicit)]
210 location: Location,
211 },
212
213 #[snafu(display("Failed to parse segment key: {key}"))]
214 ParseSegmentKey {
215 #[snafu(implicit)]
216 location: Location,
217 key: String,
218 #[snafu(source)]
219 error: std::num::ParseIntError,
220 },
221
222 #[snafu(display("Unexpected: {err_msg}"))]
223 Unexpected {
224 #[snafu(implicit)]
225 location: Location,
226 err_msg: String,
227 },
228
229 #[snafu(display("Not support to rollback the procedure"))]
230 RollbackNotSupported {
231 #[snafu(implicit)]
232 location: Location,
233 },
234
235 #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
236 ProcedureNotFound {
237 procedure_id: ProcedureId,
238 #[snafu(implicit)]
239 location: Location,
240 },
241
242 #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
243 PoisonKeyNotDefined {
244 key: PoisonKey,
245 procedure_id: ProcedureId,
246 #[snafu(implicit)]
247 location: Location,
248 },
249}
250
251pub type Result<T> = std::result::Result<T, Error>;
252
253impl ErrorExt for Error {
254 fn status_code(&self) -> StatusCode {
255 match self {
256 Error::External { source, .. }
257 | Error::PutState { source, .. }
258 | Error::DeleteStates { source, .. }
259 | Error::ListState { source, .. }
260 | Error::PutPoison { source, .. }
261 | Error::DeletePoison { source, .. }
262 | Error::GetPoison { source, .. }
263 | Error::CheckStatus { source, .. } => source.status_code(),
264
265 Error::ToJson { .. }
266 | Error::DeleteState { .. }
267 | Error::FromJson { .. }
268 | Error::WaitWatcher { .. }
269 | Error::RetryLater { .. }
270 | Error::RollbackProcedureRecovered { .. }
271 | Error::TooManyRunningProcedures { .. }
272 | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
273
274 Error::RetryTimesExceeded { .. }
275 | Error::RollbackTimesExceeded { .. }
276 | Error::ManagerNotStart { .. }
277 | Error::ManagerPasued { .. } => StatusCode::IllegalState,
278
279 Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
280 Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
281 StatusCode::InvalidArguments
282 }
283 Error::ProcedurePanic { .. }
284 | Error::ParseSegmentKey { .. }
285 | Error::Unexpected { .. }
286 | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
287 Error::ProcedureExec { source, .. } => source.status_code(),
288 Error::StartRemoveOutdatedMetaTask { source, .. }
289 | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
290 }
291 }
292
293 fn as_any(&self) -> &dyn Any {
294 self
295 }
296}
297
298impl Error {
299 pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
301 Error::External {
302 source: BoxedError::new(err),
303 clean_poisons: false,
304 }
305 }
306
307 pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
309 Error::External {
310 source: BoxedError::new(err),
311 clean_poisons: true,
312 }
313 }
314
315 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
317 Error::RetryLater {
318 source: BoxedError::new(err),
319 clean_poisons: false,
320 }
321 }
322
323 pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
325 Error::RetryLater {
326 source: BoxedError::new(err),
327 clean_poisons: true,
328 }
329 }
330
331 pub fn is_retry_later(&self) -> bool {
333 matches!(self, Error::RetryLater { .. })
334 }
335
336 pub fn need_clean_poisons(&self) -> bool {
338 matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
339 || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
340 }
341
342 pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
345 if err.status_code().is_retryable() {
346 Error::retry_later(err)
347 } else {
348 Error::external(err)
349 }
350 }
351}