flow/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Error definition for flow module
16
17use std::any::Any;
18
19use api::v1::CreateTableExpr;
20use arrow_schema::ArrowError;
21use common_error::ext::BoxedError;
22use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
23use common_macro::stack_trace_debug;
24use common_telemetry::common_error::ext::ErrorExt;
25use common_telemetry::common_error::status_code::StatusCode;
26use snafu::{Location, ResultExt, Snafu};
27use tonic::metadata::MetadataMap;
28
29use crate::FlowId;
30use crate::expr::EvalError;
31
32/// This error is used to represent all possible errors that can occur in the flow module.
33#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37    #[snafu(display(
38        "Failed to insert into flow: region_id={}, flow_ids={:?}",
39        region_id,
40        flow_ids
41    ))]
42    InsertIntoFlow {
43        region_id: u64,
44        flow_ids: Vec<u64>,
45        source: BoxedError,
46        #[snafu(implicit)]
47        location: Location,
48    },
49
50    #[snafu(display("Flow engine is still recovering"))]
51    FlowNotRecovered {
52        #[snafu(implicit)]
53        location: Location,
54    },
55
56    #[snafu(display("Error encountered while creating flow: {sql}"))]
57    CreateFlow {
58        sql: String,
59        source: BoxedError,
60        #[snafu(implicit)]
61        location: Location,
62    },
63
64    #[snafu(display("Error encountered while creating sink table for flow: {create:?}"))]
65    CreateSinkTable {
66        create: CreateTableExpr,
67        source: BoxedError,
68        #[snafu(implicit)]
69        location: Location,
70    },
71
72    #[snafu(display("Time error"))]
73    Time {
74        source: common_time::error::Error,
75        #[snafu(implicit)]
76        location: Location,
77    },
78
79    #[snafu(display("No available frontend found after timeout: {timeout:?}, context: {context}"))]
80    NoAvailableFrontend {
81        timeout: std::time::Duration,
82        context: String,
83        #[snafu(implicit)]
84        location: Location,
85    },
86
87    #[snafu(display("External error"))]
88    External {
89        source: BoxedError,
90        #[snafu(implicit)]
91        location: Location,
92    },
93
94    #[snafu(display("Internal error"))]
95    Internal {
96        reason: String,
97        #[snafu(implicit)]
98        location: Location,
99    },
100
101    /// TODO(discord9): add detailed location of column
102    #[snafu(display("Failed to eval stream"))]
103    Eval {
104        source: EvalError,
105        #[snafu(implicit)]
106        location: Location,
107    },
108
109    #[snafu(display("Table not found: {name}"))]
110    TableNotFound {
111        name: String,
112        #[snafu(implicit)]
113        location: Location,
114    },
115
116    #[snafu(display("Table not found: {msg}, meta error: {source}"))]
117    TableNotFoundMeta {
118        source: common_meta::error::Error,
119        msg: String,
120        #[snafu(implicit)]
121        location: Location,
122    },
123
124    #[snafu(display("Flow not found, id={id}"))]
125    FlowNotFound {
126        id: FlowId,
127        #[snafu(implicit)]
128        location: Location,
129    },
130
131    #[snafu(display("Failed to list flows in flownode={id:?}"))]
132    ListFlows {
133        id: Option<common_meta::FlownodeId>,
134        source: common_meta::error::Error,
135        #[snafu(implicit)]
136        location: Location,
137    },
138
139    #[snafu(display("Flow already exist, id={id}"))]
140    FlowAlreadyExist {
141        id: FlowId,
142        #[snafu(implicit)]
143        location: Location,
144    },
145
146    #[snafu(display("Failed to join task"))]
147    JoinTask {
148        #[snafu(source)]
149        error: tokio::task::JoinError,
150        #[snafu(implicit)]
151        location: Location,
152    },
153
154    #[snafu(display("Invalid query: {reason}"))]
155    InvalidQuery {
156        reason: String,
157        #[snafu(implicit)]
158        location: Location,
159    },
160
161    #[snafu(display("Not implement in flow: {reason}"))]
162    NotImplemented {
163        reason: String,
164        #[snafu(implicit)]
165        location: Location,
166    },
167
168    #[snafu(display("Invalid auth config"))]
169    IllegalAuthConfig { source: auth::error::Error },
170
171    #[snafu(display("Flow plan error: {reason}"))]
172    Plan {
173        reason: String,
174        #[snafu(implicit)]
175        location: Location,
176    },
177
178    #[snafu(display("Unsupported: {reason}"))]
179    Unsupported {
180        reason: String,
181        #[snafu(implicit)]
182        location: Location,
183    },
184
185    #[snafu(display("Unsupported temporal filter: {reason}"))]
186    UnsupportedTemporalFilter {
187        reason: String,
188        #[snafu(implicit)]
189        location: Location,
190    },
191
192    #[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
193    Datatypes {
194        source: datatypes::Error,
195        extra: String,
196        #[snafu(implicit)]
197        location: Location,
198    },
199
200    #[snafu(display("Arrow error: {raw:?} in context: {context}"))]
201    Arrow {
202        #[snafu(source)]
203        raw: ArrowError,
204        context: String,
205        #[snafu(implicit)]
206        location: Location,
207    },
208
209    #[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
210    Datafusion {
211        #[snafu(source)]
212        raw: datafusion_common::DataFusionError,
213        context: String,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Unexpected: {reason}"))]
219    Unexpected {
220        reason: String,
221        #[snafu(implicit)]
222        location: Location,
223    },
224
225    #[snafu(display("Illegal check task state: {reason}"))]
226    IllegalCheckTaskState {
227        reason: String,
228        #[snafu(implicit)]
229        location: Location,
230    },
231
232    #[snafu(display(
233        "Failed to sync with check task for flow {} with allow_drop={}",
234        flow_id,
235        allow_drop
236    ))]
237    SyncCheckTask {
238        flow_id: FlowId,
239        allow_drop: bool,
240        #[snafu(implicit)]
241        location: Location,
242    },
243
244    #[snafu(display("Failed to start server"))]
245    StartServer {
246        #[snafu(implicit)]
247        location: Location,
248        source: servers::error::Error,
249    },
250
251    #[snafu(display("Failed to shutdown server"))]
252    ShutdownServer {
253        #[snafu(implicit)]
254        location: Location,
255        source: servers::error::Error,
256    },
257
258    #[snafu(display("Failed to initialize meta client"))]
259    MetaClientInit {
260        #[snafu(implicit)]
261        location: Location,
262        source: meta_client::error::Error,
263    },
264
265    #[snafu(display("Failed to parse address {}", addr))]
266    ParseAddr {
267        addr: String,
268        #[snafu(source)]
269        error: std::net::AddrParseError,
270    },
271
272    #[snafu(display("Failed to get cache from cache registry: {}", name))]
273    CacheRequired {
274        #[snafu(implicit)]
275        location: Location,
276        name: String,
277    },
278
279    #[snafu(display("Invalid request: {context}"))]
280    InvalidRequest {
281        context: String,
282        source: client::Error,
283        #[snafu(implicit)]
284        location: Location,
285    },
286
287    #[snafu(display("Failed to encode logical plan in substrait"))]
288    SubstraitEncodeLogicalPlan {
289        #[snafu(implicit)]
290        location: Location,
291        source: substrait::error::Error,
292    },
293
294    #[snafu(display("Failed to convert column schema to proto column def"))]
295    ConvertColumnSchema {
296        #[snafu(implicit)]
297        location: Location,
298        source: operator::error::Error,
299    },
300
301    #[snafu(display("Failed to create channel manager for gRPC client"))]
302    InvalidClientConfig {
303        #[snafu(implicit)]
304        location: Location,
305        source: common_grpc::error::Error,
306    },
307}
308
309/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
310pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
311    let msg = err.to_string();
312    let last_err_msg = common_error::ext::StackError::last(&err).to_string();
313    let code = err.status_code() as u32;
314    let header = from_err_code_msg_to_header(code, &last_err_msg);
315
316    tonic::Status::with_metadata(
317        tonic::Code::InvalidArgument,
318        msg,
319        MetadataMap::from_headers(header),
320    )
321}
322
323/// Result type for flow module
324pub type Result<T> = std::result::Result<T, Error>;
325
326impl ErrorExt for Error {
327    fn status_code(&self) -> StatusCode {
328        match self {
329            Self::Eval { .. }
330            | Self::JoinTask { .. }
331            | Self::Datafusion { .. }
332            | Self::InsertIntoFlow { .. }
333            | Self::NoAvailableFrontend { .. }
334            | Self::FlowNotRecovered { .. } => StatusCode::Internal,
335            Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
336            Self::TableNotFound { .. }
337            | Self::TableNotFoundMeta { .. }
338            | Self::ListFlows { .. } => StatusCode::TableNotFound,
339            Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
340            Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
341            Self::CreateFlow { .. }
342            | Self::CreateSinkTable { .. }
343            | Self::Arrow { .. }
344            | Self::Time { .. } => StatusCode::EngineExecuteQuery,
345            Self::Unexpected { .. }
346            | Self::SyncCheckTask { .. }
347            | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
348            Self::NotImplemented { .. }
349            | Self::UnsupportedTemporalFilter { .. }
350            | Self::Unsupported { .. } => StatusCode::Unsupported,
351            Self::External { source, .. } => source.status_code(),
352            Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
353            Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
354                source.status_code()
355            }
356            Self::MetaClientInit { source, .. } => source.status_code(),
357
358            Self::InvalidQuery { .. }
359            | Self::InvalidRequest { .. }
360            | Self::ParseAddr { .. }
361            | Self::IllegalAuthConfig { .. }
362            | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
363
364            Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
365
366            Error::ConvertColumnSchema { source, .. } => source.status_code(),
367        }
368    }
369
370    fn as_any(&self) -> &dyn Any {
371        self
372    }
373}
374
375define_into_tonic_status!(Error);
376
377impl From<EvalError> for Error {
378    fn from(e: EvalError) -> Self {
379        Err::<(), _>(e).context(EvalSnafu).unwrap_err()
380    }
381}