common_procedure/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26/// Procedure error.
27#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31    #[snafu(display(
32        "Failed to execute procedure due to external error, clean poisons: {}",
33        clean_poisons
34    ))]
35    External {
36        source: BoxedError,
37        clean_poisons: bool,
38    },
39
40    #[snafu(display("Loader {} is already registered", name))]
41    LoaderConflict {
42        name: String,
43        #[snafu(implicit)]
44        location: Location,
45    },
46
47    #[snafu(display("Procedure Manager is stopped"))]
48    ManagerNotStart {
49        #[snafu(implicit)]
50        location: Location,
51    },
52
53    #[snafu(display("Failed to serialize to json"))]
54    ToJson {
55        #[snafu(source)]
56        error: serde_json::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Procedure {} already exists", procedure_id))]
62    DuplicateProcedure {
63        procedure_id: ProcedureId,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
69    TooManyRunningProcedures {
70        max_running_procedures: usize,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Failed to put state, key: '{key}'"))]
76    PutState {
77        key: String,
78        #[snafu(implicit)]
79        location: Location,
80        source: BoxedError,
81    },
82
83    #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
84    PutPoison {
85        key: String,
86        token: String,
87        #[snafu(implicit)]
88        location: Location,
89        source: BoxedError,
90    },
91
92    #[snafu(display("Failed to get poison, key: '{key}'"))]
93    GetPoison {
94        key: String,
95        #[snafu(implicit)]
96        location: Location,
97        source: BoxedError,
98    },
99
100    #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
101    DeletePoison {
102        key: String,
103        token: String,
104        #[snafu(implicit)]
105        location: Location,
106        source: BoxedError,
107    },
108
109    #[snafu(display("Failed to delete {}", key))]
110    DeleteState {
111        key: String,
112        #[snafu(source)]
113        error: object_store::Error,
114    },
115
116    #[snafu(display("Failed to delete keys: '{keys}'"))]
117    DeleteStates {
118        keys: String,
119        #[snafu(implicit)]
120        location: Location,
121        source: BoxedError,
122    },
123
124    #[snafu(display("Failed to list state, path: '{path}'"))]
125    ListState {
126        path: String,
127        #[snafu(implicit)]
128        location: Location,
129        source: BoxedError,
130    },
131
132    #[snafu(display("Failed to deserialize from json"))]
133    FromJson {
134        #[snafu(source)]
135        error: serde_json::Error,
136        #[snafu(implicit)]
137        location: Location,
138    },
139
140    #[snafu(display("Procedure exec failed"))]
141    RetryLater {
142        source: BoxedError,
143        clean_poisons: bool,
144    },
145
146    #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
147    ProcedurePanic { procedure_id: ProcedureId },
148
149    #[snafu(display("Failed to wait watcher"))]
150    WaitWatcher {
151        #[snafu(source)]
152        error: tokio::sync::watch::error::RecvError,
153        #[snafu(implicit)]
154        location: Location,
155    },
156
157    #[snafu(display("Failed to execute procedure"))]
158    ProcedureExec {
159        source: Arc<Error>,
160        #[snafu(implicit)]
161        location: Location,
162    },
163
164    #[snafu(display("Rollback Procedure recovered: {error}"))]
165    RollbackProcedureRecovered {
166        error: String,
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
172    RetryTimesExceeded {
173        source: Arc<Error>,
174        procedure_id: ProcedureId,
175    },
176
177    #[snafu(display(
178        "Procedure rollback exceeded max times, procedure_id: {}",
179        procedure_id
180    ))]
181    RollbackTimesExceeded {
182        source: Arc<Error>,
183        procedure_id: ProcedureId,
184    },
185
186    #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
187    StartRemoveOutdatedMetaTask {
188        source: common_runtime::error::Error,
189        #[snafu(implicit)]
190        location: Location,
191    },
192
193    #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
194    StopRemoveOutdatedMetaTask {
195        source: common_runtime::error::Error,
196        #[snafu(implicit)]
197        location: Location,
198    },
199
200    #[snafu(display("Failed to parse segment key: {key}"))]
201    ParseSegmentKey {
202        #[snafu(implicit)]
203        location: Location,
204        key: String,
205        #[snafu(source)]
206        error: std::num::ParseIntError,
207    },
208
209    #[snafu(display("Unexpected: {err_msg}"))]
210    Unexpected {
211        #[snafu(implicit)]
212        location: Location,
213        err_msg: String,
214    },
215
216    #[snafu(display("Not support to rollback the procedure"))]
217    RollbackNotSupported {
218        #[snafu(implicit)]
219        location: Location,
220    },
221
222    #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
223    ProcedureNotFound {
224        procedure_id: ProcedureId,
225        #[snafu(implicit)]
226        location: Location,
227    },
228
229    #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
230    PoisonKeyNotDefined {
231        key: PoisonKey,
232        procedure_id: ProcedureId,
233        #[snafu(implicit)]
234        location: Location,
235    },
236}
237
238pub type Result<T> = std::result::Result<T, Error>;
239
240impl ErrorExt for Error {
241    fn status_code(&self) -> StatusCode {
242        match self {
243            Error::External { source, .. }
244            | Error::PutState { source, .. }
245            | Error::DeleteStates { source, .. }
246            | Error::ListState { source, .. }
247            | Error::PutPoison { source, .. }
248            | Error::DeletePoison { source, .. }
249            | Error::GetPoison { source, .. } => source.status_code(),
250
251            Error::ToJson { .. }
252            | Error::DeleteState { .. }
253            | Error::FromJson { .. }
254            | Error::WaitWatcher { .. }
255            | Error::RetryLater { .. }
256            | Error::RollbackProcedureRecovered { .. }
257            | Error::TooManyRunningProcedures { .. }
258            | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
259
260            Error::RetryTimesExceeded { .. }
261            | Error::RollbackTimesExceeded { .. }
262            | Error::ManagerNotStart { .. } => StatusCode::IllegalState,
263
264            Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
265            Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
266                StatusCode::InvalidArguments
267            }
268            Error::ProcedurePanic { .. }
269            | Error::ParseSegmentKey { .. }
270            | Error::Unexpected { .. }
271            | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
272            Error::ProcedureExec { source, .. } => source.status_code(),
273            Error::StartRemoveOutdatedMetaTask { source, .. }
274            | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
275        }
276    }
277
278    fn as_any(&self) -> &dyn Any {
279        self
280    }
281}
282
283impl Error {
284    /// Creates a new [Error::External] error from source `err`.
285    pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
286        Error::External {
287            source: BoxedError::new(err),
288            clean_poisons: false,
289        }
290    }
291
292    /// Creates a new [Error::External] error from source `err` and clean poisons.
293    pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
294        Error::External {
295            source: BoxedError::new(err),
296            clean_poisons: true,
297        }
298    }
299
300    /// Creates a new [Error::RetryLater] error from source `err`.
301    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
302        Error::RetryLater {
303            source: BoxedError::new(err),
304            clean_poisons: false,
305        }
306    }
307
308    /// Creates a new [Error::RetryLater] error from source `err` and clean poisons.
309    pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
310        Error::RetryLater {
311            source: BoxedError::new(err),
312            clean_poisons: true,
313        }
314    }
315
316    /// Determine whether it is a retry later type through [StatusCode]
317    pub fn is_retry_later(&self) -> bool {
318        matches!(self, Error::RetryLater { .. })
319    }
320
321    /// Determine whether it needs to clean poisons.
322    pub fn need_clean_poisons(&self) -> bool {
323        matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
324            || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
325    }
326
327    /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
328    /// to its [StatusCode].
329    pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
330        if err.status_code().is_retryable() {
331            Error::retry_later(err)
332        } else {
333            Error::external(err)
334        }
335    }
336}