1use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31 #[snafu(display(
32 "Failed to execute procedure due to external error, clean poisons: {}",
33 clean_poisons
34 ))]
35 External {
36 source: BoxedError,
37 clean_poisons: bool,
38 },
39
40 #[snafu(display("Loader {} is already registered", name))]
41 LoaderConflict {
42 name: String,
43 #[snafu(implicit)]
44 location: Location,
45 },
46
47 #[snafu(display("Procedure Manager is stopped"))]
48 ManagerNotStart {
49 #[snafu(implicit)]
50 location: Location,
51 },
52
53 #[snafu(display("Failed to serialize to json"))]
54 ToJson {
55 #[snafu(source)]
56 error: serde_json::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Procedure {} already exists", procedure_id))]
62 DuplicateProcedure {
63 procedure_id: ProcedureId,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
69 TooManyRunningProcedures {
70 max_running_procedures: usize,
71 #[snafu(implicit)]
72 location: Location,
73 },
74
75 #[snafu(display("Failed to put state, key: '{key}'"))]
76 PutState {
77 key: String,
78 #[snafu(implicit)]
79 location: Location,
80 source: BoxedError,
81 },
82
83 #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
84 PutPoison {
85 key: String,
86 token: String,
87 #[snafu(implicit)]
88 location: Location,
89 source: BoxedError,
90 },
91
92 #[snafu(display("Failed to get poison, key: '{key}'"))]
93 GetPoison {
94 key: String,
95 #[snafu(implicit)]
96 location: Location,
97 source: BoxedError,
98 },
99
100 #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
101 DeletePoison {
102 key: String,
103 token: String,
104 #[snafu(implicit)]
105 location: Location,
106 source: BoxedError,
107 },
108
109 #[snafu(display("Failed to delete {}", key))]
110 DeleteState {
111 key: String,
112 #[snafu(source)]
113 error: object_store::Error,
114 },
115
116 #[snafu(display("Failed to delete keys: '{keys}'"))]
117 DeleteStates {
118 keys: String,
119 #[snafu(implicit)]
120 location: Location,
121 source: BoxedError,
122 },
123
124 #[snafu(display("Failed to list state, path: '{path}'"))]
125 ListState {
126 path: String,
127 #[snafu(implicit)]
128 location: Location,
129 source: BoxedError,
130 },
131
132 #[snafu(display("Failed to deserialize from json"))]
133 FromJson {
134 #[snafu(source)]
135 error: serde_json::Error,
136 #[snafu(implicit)]
137 location: Location,
138 },
139
140 #[snafu(display("Procedure exec failed"))]
141 RetryLater { source: BoxedError },
142
143 #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
144 ProcedurePanic { procedure_id: ProcedureId },
145
146 #[snafu(display("Failed to wait watcher"))]
147 WaitWatcher {
148 #[snafu(source)]
149 error: tokio::sync::watch::error::RecvError,
150 #[snafu(implicit)]
151 location: Location,
152 },
153
154 #[snafu(display("Failed to execute procedure"))]
155 ProcedureExec {
156 source: Arc<Error>,
157 #[snafu(implicit)]
158 location: Location,
159 },
160
161 #[snafu(display("Rollback Procedure recovered: {error}"))]
162 RollbackProcedureRecovered {
163 error: String,
164 #[snafu(implicit)]
165 location: Location,
166 },
167
168 #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
169 RetryTimesExceeded {
170 source: Arc<Error>,
171 procedure_id: ProcedureId,
172 },
173
174 #[snafu(display(
175 "Procedure rollback exceeded max times, procedure_id: {}",
176 procedure_id
177 ))]
178 RollbackTimesExceeded {
179 source: Arc<Error>,
180 procedure_id: ProcedureId,
181 },
182
183 #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
184 StartRemoveOutdatedMetaTask {
185 source: common_runtime::error::Error,
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
191 StopRemoveOutdatedMetaTask {
192 source: common_runtime::error::Error,
193 #[snafu(implicit)]
194 location: Location,
195 },
196
197 #[snafu(display("Failed to parse segment key: {key}"))]
198 ParseSegmentKey {
199 #[snafu(implicit)]
200 location: Location,
201 key: String,
202 #[snafu(source)]
203 error: std::num::ParseIntError,
204 },
205
206 #[snafu(display("Unexpected: {err_msg}"))]
207 Unexpected {
208 #[snafu(implicit)]
209 location: Location,
210 err_msg: String,
211 },
212
213 #[snafu(display("Not support to rollback the procedure"))]
214 RollbackNotSupported {
215 #[snafu(implicit)]
216 location: Location,
217 },
218
219 #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
220 ProcedureNotFound {
221 procedure_id: ProcedureId,
222 #[snafu(implicit)]
223 location: Location,
224 },
225
226 #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
227 PoisonKeyNotDefined {
228 key: PoisonKey,
229 procedure_id: ProcedureId,
230 #[snafu(implicit)]
231 location: Location,
232 },
233}
234
235pub type Result<T> = std::result::Result<T, Error>;
236
237impl ErrorExt for Error {
238 fn status_code(&self) -> StatusCode {
239 match self {
240 Error::External { source, .. }
241 | Error::PutState { source, .. }
242 | Error::DeleteStates { source, .. }
243 | Error::ListState { source, .. }
244 | Error::PutPoison { source, .. }
245 | Error::DeletePoison { source, .. }
246 | Error::GetPoison { source, .. } => source.status_code(),
247
248 Error::ToJson { .. }
249 | Error::DeleteState { .. }
250 | Error::FromJson { .. }
251 | Error::WaitWatcher { .. }
252 | Error::RetryLater { .. }
253 | Error::RollbackProcedureRecovered { .. }
254 | Error::TooManyRunningProcedures { .. }
255 | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
256
257 Error::RetryTimesExceeded { .. }
258 | Error::RollbackTimesExceeded { .. }
259 | Error::ManagerNotStart { .. } => StatusCode::IllegalState,
260
261 Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
262 Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
263 StatusCode::InvalidArguments
264 }
265 Error::ProcedurePanic { .. }
266 | Error::ParseSegmentKey { .. }
267 | Error::Unexpected { .. }
268 | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
269 Error::ProcedureExec { source, .. } => source.status_code(),
270 Error::StartRemoveOutdatedMetaTask { source, .. }
271 | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
272 }
273 }
274
275 fn as_any(&self) -> &dyn Any {
276 self
277 }
278}
279
280impl Error {
281 pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
283 Error::External {
284 source: BoxedError::new(err),
285 clean_poisons: false,
286 }
287 }
288
289 pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
291 Error::External {
292 source: BoxedError::new(err),
293 clean_poisons: true,
294 }
295 }
296
297 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
299 Error::RetryLater {
300 source: BoxedError::new(err),
301 }
302 }
303
304 pub fn is_retry_later(&self) -> bool {
306 matches!(self, Error::RetryLater { .. })
307 }
308
309 pub fn need_clean_poisons(&self) -> bool {
311 matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
312 }
313
314 pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
317 if err.status_code().is_retryable() {
318 Error::retry_later(err)
319 } else {
320 Error::external(err)
321 }
322 }
323}