common_procedure/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26/// Procedure error.
27#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31    #[snafu(display(
32        "Failed to execute procedure due to external error, clean poisons: {}",
33        clean_poisons
34    ))]
35    External {
36        source: BoxedError,
37        clean_poisons: bool,
38    },
39
40    #[snafu(display("Loader {} is already registered", name))]
41    LoaderConflict {
42        name: String,
43        #[snafu(implicit)]
44        location: Location,
45    },
46
47    #[snafu(display("Procedure Manager is stopped"))]
48    ManagerNotStart {
49        #[snafu(implicit)]
50        location: Location,
51    },
52
53    #[snafu(display("Failed to serialize to json"))]
54    ToJson {
55        #[snafu(source)]
56        error: serde_json::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Procedure {} already exists", procedure_id))]
62    DuplicateProcedure {
63        procedure_id: ProcedureId,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
69    TooManyRunningProcedures {
70        max_running_procedures: usize,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Failed to put state, key: '{key}'"))]
76    PutState {
77        key: String,
78        #[snafu(implicit)]
79        location: Location,
80        source: BoxedError,
81    },
82
83    #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
84    PutPoison {
85        key: String,
86        token: String,
87        #[snafu(implicit)]
88        location: Location,
89        source: BoxedError,
90    },
91
92    #[snafu(display("Failed to get poison, key: '{key}'"))]
93    GetPoison {
94        key: String,
95        #[snafu(implicit)]
96        location: Location,
97        source: BoxedError,
98    },
99
100    #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
101    DeletePoison {
102        key: String,
103        token: String,
104        #[snafu(implicit)]
105        location: Location,
106        source: BoxedError,
107    },
108
109    #[snafu(display("Failed to delete {}", key))]
110    DeleteState {
111        key: String,
112        #[snafu(source)]
113        error: object_store::Error,
114    },
115
116    #[snafu(display("Failed to delete keys: '{keys}'"))]
117    DeleteStates {
118        keys: String,
119        #[snafu(implicit)]
120        location: Location,
121        source: BoxedError,
122    },
123
124    #[snafu(display("Failed to list state, path: '{path}'"))]
125    ListState {
126        path: String,
127        #[snafu(implicit)]
128        location: Location,
129        source: BoxedError,
130    },
131
132    #[snafu(display("Failed to deserialize from json"))]
133    FromJson {
134        #[snafu(source)]
135        error: serde_json::Error,
136        #[snafu(implicit)]
137        location: Location,
138    },
139
140    #[snafu(display("Procedure exec failed"))]
141    RetryLater { source: BoxedError },
142
143    #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
144    ProcedurePanic { procedure_id: ProcedureId },
145
146    #[snafu(display("Failed to wait watcher"))]
147    WaitWatcher {
148        #[snafu(source)]
149        error: tokio::sync::watch::error::RecvError,
150        #[snafu(implicit)]
151        location: Location,
152    },
153
154    #[snafu(display("Failed to execute procedure"))]
155    ProcedureExec {
156        source: Arc<Error>,
157        #[snafu(implicit)]
158        location: Location,
159    },
160
161    #[snafu(display("Rollback Procedure recovered: {error}"))]
162    RollbackProcedureRecovered {
163        error: String,
164        #[snafu(implicit)]
165        location: Location,
166    },
167
168    #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
169    RetryTimesExceeded {
170        source: Arc<Error>,
171        procedure_id: ProcedureId,
172    },
173
174    #[snafu(display(
175        "Procedure rollback exceeded max times, procedure_id: {}",
176        procedure_id
177    ))]
178    RollbackTimesExceeded {
179        source: Arc<Error>,
180        procedure_id: ProcedureId,
181    },
182
183    #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
184    StartRemoveOutdatedMetaTask {
185        source: common_runtime::error::Error,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
191    StopRemoveOutdatedMetaTask {
192        source: common_runtime::error::Error,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Failed to parse segment key: {key}"))]
198    ParseSegmentKey {
199        #[snafu(implicit)]
200        location: Location,
201        key: String,
202        #[snafu(source)]
203        error: std::num::ParseIntError,
204    },
205
206    #[snafu(display("Unexpected: {err_msg}"))]
207    Unexpected {
208        #[snafu(implicit)]
209        location: Location,
210        err_msg: String,
211    },
212
213    #[snafu(display("Not support to rollback the procedure"))]
214    RollbackNotSupported {
215        #[snafu(implicit)]
216        location: Location,
217    },
218
219    #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
220    ProcedureNotFound {
221        procedure_id: ProcedureId,
222        #[snafu(implicit)]
223        location: Location,
224    },
225
226    #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
227    PoisonKeyNotDefined {
228        key: PoisonKey,
229        procedure_id: ProcedureId,
230        #[snafu(implicit)]
231        location: Location,
232    },
233}
234
235pub type Result<T> = std::result::Result<T, Error>;
236
237impl ErrorExt for Error {
238    fn status_code(&self) -> StatusCode {
239        match self {
240            Error::External { source, .. }
241            | Error::PutState { source, .. }
242            | Error::DeleteStates { source, .. }
243            | Error::ListState { source, .. }
244            | Error::PutPoison { source, .. }
245            | Error::DeletePoison { source, .. }
246            | Error::GetPoison { source, .. } => source.status_code(),
247
248            Error::ToJson { .. }
249            | Error::DeleteState { .. }
250            | Error::FromJson { .. }
251            | Error::WaitWatcher { .. }
252            | Error::RetryLater { .. }
253            | Error::RollbackProcedureRecovered { .. }
254            | Error::TooManyRunningProcedures { .. }
255            | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
256
257            Error::RetryTimesExceeded { .. }
258            | Error::RollbackTimesExceeded { .. }
259            | Error::ManagerNotStart { .. } => StatusCode::IllegalState,
260
261            Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
262            Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
263                StatusCode::InvalidArguments
264            }
265            Error::ProcedurePanic { .. }
266            | Error::ParseSegmentKey { .. }
267            | Error::Unexpected { .. }
268            | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
269            Error::ProcedureExec { source, .. } => source.status_code(),
270            Error::StartRemoveOutdatedMetaTask { source, .. }
271            | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
272        }
273    }
274
275    fn as_any(&self) -> &dyn Any {
276        self
277    }
278}
279
280impl Error {
281    /// Creates a new [Error::External] error from source `err`.
282    pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
283        Error::External {
284            source: BoxedError::new(err),
285            clean_poisons: false,
286        }
287    }
288
289    /// Creates a new [Error::External] error from source `err` and clean poisons.
290    pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
291        Error::External {
292            source: BoxedError::new(err),
293            clean_poisons: true,
294        }
295    }
296
297    /// Creates a new [Error::RetryLater] error from source `err`.
298    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
299        Error::RetryLater {
300            source: BoxedError::new(err),
301        }
302    }
303
304    /// Determine whether it is a retry later type through [StatusCode]
305    pub fn is_retry_later(&self) -> bool {
306        matches!(self, Error::RetryLater { .. })
307    }
308
309    /// Determine whether it needs to clean poisons.
310    pub fn need_clean_poisons(&self) -> bool {
311        matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
312    }
313
314    /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
315    /// to its [StatusCode].
316    pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
317        if err.status_code().is_retryable() {
318            Error::retry_later(err)
319        } else {
320            Error::external(err)
321        }
322    }
323}