common_procedure/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use snafu::{Location, Snafu};
22
23use crate::procedure::ProcedureId;
24use crate::PoisonKey;
25
26/// Procedure error.
27#[derive(Snafu)]
28#[snafu(visibility(pub))]
29#[stack_trace_debug]
30pub enum Error {
31    #[snafu(display("Failed to check procedure manager status"))]
32    CheckStatus {
33        source: BoxedError,
34        #[snafu(implicit)]
35        location: Location,
36    },
37
38    #[snafu(display("Manager is pasued"))]
39    ManagerPasued {
40        #[snafu(implicit)]
41        location: Location,
42    },
43
44    #[snafu(display(
45        "Failed to execute procedure due to external error, clean poisons: {}",
46        clean_poisons
47    ))]
48    External {
49        source: BoxedError,
50        clean_poisons: bool,
51    },
52
53    #[snafu(display("Loader {} is already registered", name))]
54    LoaderConflict {
55        name: String,
56        #[snafu(implicit)]
57        location: Location,
58    },
59
60    #[snafu(display("Procedure Manager is stopped"))]
61    ManagerNotStart {
62        #[snafu(implicit)]
63        location: Location,
64    },
65
66    #[snafu(display("Failed to serialize to json"))]
67    ToJson {
68        #[snafu(source)]
69        error: serde_json::Error,
70        #[snafu(implicit)]
71        location: Location,
72    },
73
74    #[snafu(display("Procedure {} already exists", procedure_id))]
75    DuplicateProcedure {
76        procedure_id: ProcedureId,
77        #[snafu(implicit)]
78        location: Location,
79    },
80
81    #[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
82    TooManyRunningProcedures {
83        max_running_procedures: usize,
84        #[snafu(implicit)]
85        location: Location,
86    },
87
88    #[snafu(display("Failed to put state, key: '{key}'"))]
89    PutState {
90        key: String,
91        #[snafu(implicit)]
92        location: Location,
93        source: BoxedError,
94    },
95
96    #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
97    PutPoison {
98        key: String,
99        token: String,
100        #[snafu(implicit)]
101        location: Location,
102        source: BoxedError,
103    },
104
105    #[snafu(display("Failed to get poison, key: '{key}'"))]
106    GetPoison {
107        key: String,
108        #[snafu(implicit)]
109        location: Location,
110        source: BoxedError,
111    },
112
113    #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
114    DeletePoison {
115        key: String,
116        token: String,
117        #[snafu(implicit)]
118        location: Location,
119        source: BoxedError,
120    },
121
122    #[snafu(display("Failed to delete {}", key))]
123    DeleteState {
124        key: String,
125        #[snafu(source)]
126        error: object_store::Error,
127    },
128
129    #[snafu(display("Failed to delete keys: '{keys}'"))]
130    DeleteStates {
131        keys: String,
132        #[snafu(implicit)]
133        location: Location,
134        source: BoxedError,
135    },
136
137    #[snafu(display("Failed to list state, path: '{path}'"))]
138    ListState {
139        path: String,
140        #[snafu(implicit)]
141        location: Location,
142        source: BoxedError,
143    },
144
145    #[snafu(display("Failed to deserialize from json"))]
146    FromJson {
147        #[snafu(source)]
148        error: serde_json::Error,
149        #[snafu(implicit)]
150        location: Location,
151    },
152
153    #[snafu(display("Procedure exec failed"))]
154    RetryLater {
155        source: BoxedError,
156        clean_poisons: bool,
157    },
158
159    #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
160    ProcedurePanic { procedure_id: ProcedureId },
161
162    #[snafu(display("Failed to wait watcher"))]
163    WaitWatcher {
164        #[snafu(source)]
165        error: tokio::sync::watch::error::RecvError,
166        #[snafu(implicit)]
167        location: Location,
168    },
169
170    #[snafu(display("Failed to execute procedure"))]
171    ProcedureExec {
172        source: Arc<Error>,
173        #[snafu(implicit)]
174        location: Location,
175    },
176
177    #[snafu(display("Rollback Procedure recovered: {error}"))]
178    RollbackProcedureRecovered {
179        error: String,
180        #[snafu(implicit)]
181        location: Location,
182    },
183
184    #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
185    RetryTimesExceeded {
186        source: Arc<Error>,
187        procedure_id: ProcedureId,
188    },
189
190    #[snafu(display(
191        "Procedure rollback exceeded max times, procedure_id: {}",
192        procedure_id
193    ))]
194    RollbackTimesExceeded {
195        source: Arc<Error>,
196        procedure_id: ProcedureId,
197    },
198
199    #[snafu(display("Failed to start the remove_outdated_meta method, error"))]
200    StartRemoveOutdatedMetaTask {
201        source: common_runtime::error::Error,
202        #[snafu(implicit)]
203        location: Location,
204    },
205
206    #[snafu(display("Failed to stop the remove_outdated_meta method, error"))]
207    StopRemoveOutdatedMetaTask {
208        source: common_runtime::error::Error,
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Failed to parse segment key: {key}"))]
214    ParseSegmentKey {
215        #[snafu(implicit)]
216        location: Location,
217        key: String,
218        #[snafu(source)]
219        error: std::num::ParseIntError,
220    },
221
222    #[snafu(display("Unexpected: {err_msg}"))]
223    Unexpected {
224        #[snafu(implicit)]
225        location: Location,
226        err_msg: String,
227    },
228
229    #[snafu(display("Not support to rollback the procedure"))]
230    RollbackNotSupported {
231        #[snafu(implicit)]
232        location: Location,
233    },
234
235    #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
236    ProcedureNotFound {
237        procedure_id: ProcedureId,
238        #[snafu(implicit)]
239        location: Location,
240    },
241
242    #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
243    PoisonKeyNotDefined {
244        key: PoisonKey,
245        procedure_id: ProcedureId,
246        #[snafu(implicit)]
247        location: Location,
248    },
249}
250
251pub type Result<T> = std::result::Result<T, Error>;
252
253impl ErrorExt for Error {
254    fn status_code(&self) -> StatusCode {
255        match self {
256            Error::External { source, .. }
257            | Error::PutState { source, .. }
258            | Error::DeleteStates { source, .. }
259            | Error::ListState { source, .. }
260            | Error::PutPoison { source, .. }
261            | Error::DeletePoison { source, .. }
262            | Error::GetPoison { source, .. }
263            | Error::CheckStatus { source, .. } => source.status_code(),
264
265            Error::ToJson { .. }
266            | Error::DeleteState { .. }
267            | Error::FromJson { .. }
268            | Error::WaitWatcher { .. }
269            | Error::RetryLater { .. }
270            | Error::RollbackProcedureRecovered { .. }
271            | Error::TooManyRunningProcedures { .. }
272            | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
273
274            Error::RetryTimesExceeded { .. }
275            | Error::RollbackTimesExceeded { .. }
276            | Error::ManagerNotStart { .. }
277            | Error::ManagerPasued { .. } => StatusCode::IllegalState,
278
279            Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
280            Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
281                StatusCode::InvalidArguments
282            }
283            Error::ProcedurePanic { .. }
284            | Error::ParseSegmentKey { .. }
285            | Error::Unexpected { .. }
286            | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
287            Error::ProcedureExec { source, .. } => source.status_code(),
288            Error::StartRemoveOutdatedMetaTask { source, .. }
289            | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
290        }
291    }
292
293    fn as_any(&self) -> &dyn Any {
294        self
295    }
296}
297
298impl Error {
299    /// Creates a new [Error::External] error from source `err`.
300    pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
301        Error::External {
302            source: BoxedError::new(err),
303            clean_poisons: false,
304        }
305    }
306
307    /// Creates a new [Error::External] error from source `err` and clean poisons.
308    pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
309        Error::External {
310            source: BoxedError::new(err),
311            clean_poisons: true,
312        }
313    }
314
315    /// Creates a new [Error::RetryLater] error from source `err`.
316    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
317        Error::RetryLater {
318            source: BoxedError::new(err),
319            clean_poisons: false,
320        }
321    }
322
323    /// Creates a new [Error::RetryLater] error from source `err` and clean poisons.
324    pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
325        Error::RetryLater {
326            source: BoxedError::new(err),
327            clean_poisons: true,
328        }
329    }
330
331    /// Determine whether it is a retry later type through [StatusCode]
332    pub fn is_retry_later(&self) -> bool {
333        matches!(self, Error::RetryLater { .. })
334    }
335
336    /// Determine whether it needs to clean poisons.
337    pub fn need_clean_poisons(&self) -> bool {
338        matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
339            || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
340    }
341
342    /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
343    /// to its [StatusCode].
344    pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
345        if err.status_code().is_retryable() {
346            Error::retry_later(err)
347        } else {
348            Error::external(err)
349        }
350    }
351}