1use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31 #[snafu(display(
32 "Failed to execute procedure due to external error, clean poisons: {}",
33 clean_poisons
34 ))]
35 External {
36 source: BoxedError,
37 clean_poisons: bool,
38 },
39
40 #[snafu(display("Loader {} is already registered", name))]
41 LoaderConflict {
42 name: String,
43 #[snafu(implicit)]
44 location: Location,
45 },
46
47 #[snafu(display("Procedure Manager is stopped"))]
48 ManagerNotStart {
49 #[snafu(implicit)]
50 location: Location,
51 },
52
53 #[snafu(display("Failed to serialize to json"))]
54 ToJson {
55 #[snafu(source)]
56 error: serde_json::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Procedure {} already exists", procedure_id))]
62 DuplicateProcedure {
63 procedure_id: ProcedureId,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
69 TooManyRunningProcedures {
70 max_running_procedures: usize,
71 #[snafu(implicit)]
72 location: Location,
73 },
74
75 #[snafu(display("Failed to put state, key: '{key}'"))]
76 PutState {
77 key: String,
78 #[snafu(implicit)]
79 location: Location,
80 source: BoxedError,
81 },
82
83 #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
84 PutPoison {
85 key: String,
86 token: String,
87 #[snafu(implicit)]
88 location: Location,
89 source: BoxedError,
90 },
91
92 #[snafu(display("Failed to get poison, key: '{key}'"))]
93 GetPoison {
94 key: String,
95 #[snafu(implicit)]
96 location: Location,
97 source: BoxedError,
98 },
99
100 #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
101 DeletePoison {
102 key: String,
103 token: String,
104 #[snafu(implicit)]
105 location: Location,
106 source: BoxedError,
107 },
108
109 #[snafu(display("Failed to delete {}", key))]
110 DeleteState {
111 key: String,
112 #[snafu(source)]
113 error: object_store::Error,
114 },
115
116 #[snafu(display("Failed to delete keys: '{keys}'"))]
117 DeleteStates {
118 keys: String,
119 #[snafu(implicit)]
120 location: Location,
121 source: BoxedError,
122 },
123
124 #[snafu(display("Failed to list state, path: '{path}'"))]
125 ListState {
126 path: String,
127 #[snafu(implicit)]
128 location: Location,
129 source: BoxedError,
130 },
131
132 #[snafu(display("Failed to deserialize from json"))]
133 FromJson {
134 #[snafu(source)]
135 error: serde_json::Error,
136 #[snafu(implicit)]
137 location: Location,
138 },
139
140 #[snafu(display("Procedure exec failed"))]
141 RetryLater {
142 source: BoxedError,
143 clean_poisons: bool,
144 },
145
146 #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
147 ProcedurePanic { procedure_id: ProcedureId },
148
149 #[snafu(display("Failed to wait watcher"))]
150 WaitWatcher {
151 #[snafu(source)]
152 error: tokio::sync::watch::error::RecvError,
153 #[snafu(implicit)]
154 location: Location,
155 },
156
157 #[snafu(display("Failed to execute procedure"))]
158 ProcedureExec {
159 source: Arc<Error>,
160 #[snafu(implicit)]
161 location: Location,
162 },
163
164 #[snafu(display("Rollback Procedure recovered: {error}"))]
165 RollbackProcedureRecovered {
166 error: String,
167 #[snafu(implicit)]
168 location: Location,
169 },
170
171 #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
172 RetryTimesExceeded {
173 source: Arc<Error>,
174 procedure_id: ProcedureId,
175 },
176
177 #[snafu(display(
178 "Procedure rollback exceeded max times, procedure_id: {}",
179 procedure_id
180 ))]
181 RollbackTimesExceeded {
182 source: Arc<Error>,
183 procedure_id: ProcedureId,
184 },
185
186 #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
187 StartRemoveOutdatedMetaTask {
188 source: common_runtime::error::Error,
189 #[snafu(implicit)]
190 location: Location,
191 },
192
193 #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
194 StopRemoveOutdatedMetaTask {
195 source: common_runtime::error::Error,
196 #[snafu(implicit)]
197 location: Location,
198 },
199
200 #[snafu(display("Failed to parse segment key: {key}"))]
201 ParseSegmentKey {
202 #[snafu(implicit)]
203 location: Location,
204 key: String,
205 #[snafu(source)]
206 error: std::num::ParseIntError,
207 },
208
209 #[snafu(display("Unexpected: {err_msg}"))]
210 Unexpected {
211 #[snafu(implicit)]
212 location: Location,
213 err_msg: String,
214 },
215
216 #[snafu(display("Not support to rollback the procedure"))]
217 RollbackNotSupported {
218 #[snafu(implicit)]
219 location: Location,
220 },
221
222 #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
223 ProcedureNotFound {
224 procedure_id: ProcedureId,
225 #[snafu(implicit)]
226 location: Location,
227 },
228
229 #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
230 PoisonKeyNotDefined {
231 key: PoisonKey,
232 procedure_id: ProcedureId,
233 #[snafu(implicit)]
234 location: Location,
235 },
236}
237
238pub type Result<T> = std::result::Result<T, Error>;
239
240impl ErrorExt for Error {
241 fn status_code(&self) -> StatusCode {
242 match self {
243 Error::External { source, .. }
244 | Error::PutState { source, .. }
245 | Error::DeleteStates { source, .. }
246 | Error::ListState { source, .. }
247 | Error::PutPoison { source, .. }
248 | Error::DeletePoison { source, .. }
249 | Error::GetPoison { source, .. } => source.status_code(),
250
251 Error::ToJson { .. }
252 | Error::DeleteState { .. }
253 | Error::FromJson { .. }
254 | Error::WaitWatcher { .. }
255 | Error::RetryLater { .. }
256 | Error::RollbackProcedureRecovered { .. }
257 | Error::TooManyRunningProcedures { .. }
258 | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
259
260 Error::RetryTimesExceeded { .. }
261 | Error::RollbackTimesExceeded { .. }
262 | Error::ManagerNotStart { .. } => StatusCode::IllegalState,
263
264 Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
265 Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
266 StatusCode::InvalidArguments
267 }
268 Error::ProcedurePanic { .. }
269 | Error::ParseSegmentKey { .. }
270 | Error::Unexpected { .. }
271 | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
272 Error::ProcedureExec { source, .. } => source.status_code(),
273 Error::StartRemoveOutdatedMetaTask { source, .. }
274 | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
275 }
276 }
277
278 fn as_any(&self) -> &dyn Any {
279 self
280 }
281}
282
283impl Error {
284 pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
286 Error::External {
287 source: BoxedError::new(err),
288 clean_poisons: false,
289 }
290 }
291
292 pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
294 Error::External {
295 source: BoxedError::new(err),
296 clean_poisons: true,
297 }
298 }
299
300 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
302 Error::RetryLater {
303 source: BoxedError::new(err),
304 clean_poisons: false,
305 }
306 }
307
308 pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
310 Error::RetryLater {
311 source: BoxedError::new(err),
312 clean_poisons: true,
313 }
314 }
315
316 pub fn is_retry_later(&self) -> bool {
318 matches!(self, Error::RetryLater { .. })
319 }
320
321 pub fn need_clean_poisons(&self) -> bool {
323 matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
324 || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
325 }
326
327 pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
330 if err.status_code().is_retryable() {
331 Error::retry_later(err)
332 } else {
333 Error::external(err)
334 }
335 }
336}