flow/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Error definition for flow module
16
17use std::any::Any;
18
19use arrow_schema::ArrowError;
20use common_error::ext::BoxedError;
21use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
22use common_macro::stack_trace_debug;
23use common_telemetry::common_error::ext::ErrorExt;
24use common_telemetry::common_error::status_code::StatusCode;
25use snafu::{Location, ResultExt, Snafu};
26use tonic::metadata::MetadataMap;
27
28use crate::expr::EvalError;
29use crate::FlowId;
30
31/// This error is used to represent all possible errors that can occur in the flow module.
32#[derive(Snafu)]
33#[snafu(visibility(pub))]
34#[stack_trace_debug]
35pub enum Error {
36    #[snafu(display(
37        "Failed to insert into flow: region_id={}, flow_ids={:?}",
38        region_id,
39        flow_ids
40    ))]
41    InsertIntoFlow {
42        region_id: u64,
43        flow_ids: Vec<u64>,
44        source: BoxedError,
45        #[snafu(implicit)]
46        location: Location,
47    },
48
49    #[snafu(display("Flow engine is still recovering"))]
50    FlowNotRecovered {
51        #[snafu(implicit)]
52        location: Location,
53    },
54
55    #[snafu(display("Error encountered while creating flow: {sql}"))]
56    CreateFlow {
57        sql: String,
58        source: BoxedError,
59        #[snafu(implicit)]
60        location: Location,
61    },
62
63    #[snafu(display("Time error"))]
64    Time {
65        source: common_time::error::Error,
66        #[snafu(implicit)]
67        location: Location,
68    },
69
70    #[snafu(display(
71        "No available frontend found after timeout: {timeout:?}, context: {context}"
72    ))]
73    NoAvailableFrontend {
74        timeout: std::time::Duration,
75        context: String,
76        #[snafu(implicit)]
77        location: Location,
78    },
79
80    #[snafu(display("External error"))]
81    External {
82        source: BoxedError,
83        #[snafu(implicit)]
84        location: Location,
85    },
86
87    #[snafu(display("Internal error"))]
88    Internal {
89        reason: String,
90        #[snafu(implicit)]
91        location: Location,
92    },
93
94    /// TODO(discord9): add detailed location of column
95    #[snafu(display("Failed to eval stream"))]
96    Eval {
97        source: EvalError,
98        #[snafu(implicit)]
99        location: Location,
100    },
101
102    #[snafu(display("Table not found: {name}"))]
103    TableNotFound {
104        name: String,
105        #[snafu(implicit)]
106        location: Location,
107    },
108
109    #[snafu(display("Table not found: {msg}, meta error: {source}"))]
110    TableNotFoundMeta {
111        source: common_meta::error::Error,
112        msg: String,
113        #[snafu(implicit)]
114        location: Location,
115    },
116
117    #[snafu(display("Flow not found, id={id}"))]
118    FlowNotFound {
119        id: FlowId,
120        #[snafu(implicit)]
121        location: Location,
122    },
123
124    #[snafu(display("Failed to list flows in flownode={id:?}"))]
125    ListFlows {
126        id: Option<common_meta::FlownodeId>,
127        source: common_meta::error::Error,
128        #[snafu(implicit)]
129        location: Location,
130    },
131
132    #[snafu(display("Flow already exist, id={id}"))]
133    FlowAlreadyExist {
134        id: FlowId,
135        #[snafu(implicit)]
136        location: Location,
137    },
138
139    #[snafu(display("Failed to join task"))]
140    JoinTask {
141        #[snafu(source)]
142        error: tokio::task::JoinError,
143        #[snafu(implicit)]
144        location: Location,
145    },
146
147    #[snafu(display("Invalid query: {reason}"))]
148    InvalidQuery {
149        reason: String,
150        #[snafu(implicit)]
151        location: Location,
152    },
153
154    #[snafu(display("Not implement in flow: {reason}"))]
155    NotImplemented {
156        reason: String,
157        #[snafu(implicit)]
158        location: Location,
159    },
160
161    #[snafu(display("Invalid auth config"))]
162    IllegalAuthConfig { source: auth::error::Error },
163
164    #[snafu(display("Flow plan error: {reason}"))]
165    Plan {
166        reason: String,
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Unsupported: {reason}"))]
172    Unsupported {
173        reason: String,
174        #[snafu(implicit)]
175        location: Location,
176    },
177
178    #[snafu(display("Unsupported temporal filter: {reason}"))]
179    UnsupportedTemporalFilter {
180        reason: String,
181        #[snafu(implicit)]
182        location: Location,
183    },
184
185    #[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
186    Datatypes {
187        source: datatypes::Error,
188        extra: String,
189        #[snafu(implicit)]
190        location: Location,
191    },
192
193    #[snafu(display("Arrow error: {raw:?} in context: {context}"))]
194    Arrow {
195        #[snafu(source)]
196        raw: ArrowError,
197        context: String,
198        #[snafu(implicit)]
199        location: Location,
200    },
201
202    #[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
203    Datafusion {
204        #[snafu(source)]
205        raw: datafusion_common::DataFusionError,
206        context: String,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Unexpected: {reason}"))]
212    Unexpected {
213        reason: String,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Illegal check task state: {reason}"))]
219    IllegalCheckTaskState {
220        reason: String,
221        #[snafu(implicit)]
222        location: Location,
223    },
224
225    #[snafu(display(
226        "Failed to sync with check task for flow {} with allow_drop={}",
227        flow_id,
228        allow_drop
229    ))]
230    SyncCheckTask {
231        flow_id: FlowId,
232        allow_drop: bool,
233        #[snafu(implicit)]
234        location: Location,
235    },
236
237    #[snafu(display("Failed to start server"))]
238    StartServer {
239        #[snafu(implicit)]
240        location: Location,
241        source: servers::error::Error,
242    },
243
244    #[snafu(display("Failed to shutdown server"))]
245    ShutdownServer {
246        #[snafu(implicit)]
247        location: Location,
248        source: servers::error::Error,
249    },
250
251    #[snafu(display("Failed to initialize meta client"))]
252    MetaClientInit {
253        #[snafu(implicit)]
254        location: Location,
255        source: meta_client::error::Error,
256    },
257
258    #[snafu(display("Failed to parse address {}", addr))]
259    ParseAddr {
260        addr: String,
261        #[snafu(source)]
262        error: std::net::AddrParseError,
263    },
264
265    #[snafu(display("Failed to get cache from cache registry: {}", name))]
266    CacheRequired {
267        #[snafu(implicit)]
268        location: Location,
269        name: String,
270    },
271
272    #[snafu(display("Invalid request: {context}"))]
273    InvalidRequest {
274        context: String,
275        source: client::Error,
276        #[snafu(implicit)]
277        location: Location,
278    },
279
280    #[snafu(display("Failed to encode logical plan in substrait"))]
281    SubstraitEncodeLogicalPlan {
282        #[snafu(implicit)]
283        location: Location,
284        source: substrait::error::Error,
285    },
286
287    #[snafu(display("Failed to convert column schema to proto column def"))]
288    ConvertColumnSchema {
289        #[snafu(implicit)]
290        location: Location,
291        source: operator::error::Error,
292    },
293}
294
295/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
296pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
297    let msg = err.to_string();
298    let last_err_msg = common_error::ext::StackError::last(&err).to_string();
299    let code = err.status_code() as u32;
300    let header = from_err_code_msg_to_header(code, &last_err_msg);
301
302    tonic::Status::with_metadata(
303        tonic::Code::InvalidArgument,
304        msg,
305        MetadataMap::from_headers(header),
306    )
307}
308
309/// Result type for flow module
310pub type Result<T> = std::result::Result<T, Error>;
311
312impl ErrorExt for Error {
313    fn status_code(&self) -> StatusCode {
314        match self {
315            Self::Eval { .. }
316            | Self::JoinTask { .. }
317            | Self::Datafusion { .. }
318            | Self::InsertIntoFlow { .. }
319            | Self::NoAvailableFrontend { .. }
320            | Self::FlowNotRecovered { .. } => StatusCode::Internal,
321            Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
322            Self::TableNotFound { .. }
323            | Self::TableNotFoundMeta { .. }
324            | Self::FlowNotFound { .. }
325            | Self::ListFlows { .. } => StatusCode::TableNotFound,
326            Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
327            Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
328                StatusCode::EngineExecuteQuery
329            }
330            Self::Unexpected { .. }
331            | Self::SyncCheckTask { .. }
332            | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
333            Self::NotImplemented { .. }
334            | Self::UnsupportedTemporalFilter { .. }
335            | Self::Unsupported { .. } => StatusCode::Unsupported,
336            Self::External { source, .. } => source.status_code(),
337            Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
338            Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
339                source.status_code()
340            }
341            Self::MetaClientInit { source, .. } => source.status_code(),
342
343            Self::InvalidQuery { .. }
344            | Self::InvalidRequest { .. }
345            | Self::ParseAddr { .. }
346            | Self::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
347
348            Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
349
350            Error::ConvertColumnSchema { source, .. } => source.status_code(),
351        }
352    }
353
354    fn as_any(&self) -> &dyn Any {
355        self
356    }
357}
358
359define_into_tonic_status!(Error);
360
361impl From<EvalError> for Error {
362    fn from(e: EvalError) -> Self {
363        Err::<(), _>(e).context(EvalSnafu).unwrap_err()
364    }
365}