Skip to main content

flow/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Error definition for flow module
16
17use std::any::Any;
18
19use api::v1::CreateTableExpr;
20use arrow_schema::ArrowError;
21use common_error::ext::{BoxedError, RetryHint};
22use common_error::{
23    GREPTIME_DB_HEADER_ERROR_RETRY_HINT, define_into_tonic_status, from_err_code_msg_to_header,
24};
25use common_macro::stack_trace_debug;
26use common_telemetry::common_error::ext::ErrorExt;
27use common_telemetry::common_error::status_code::StatusCode;
28use snafu::{Location, ResultExt, Snafu};
29use tonic::codegen::http::HeaderValue;
30use tonic::metadata::MetadataMap;
31
32use crate::FlowId;
33use crate::expr::EvalError;
34
35/// This error is used to represent all possible errors that can occur in the flow module.
36#[derive(Snafu)]
37#[snafu(visibility(pub))]
38#[stack_trace_debug]
39pub enum Error {
40    #[snafu(display(
41        "Failed to insert into flow: region_id={}, flow_ids={:?}",
42        region_id,
43        flow_ids
44    ))]
45    InsertIntoFlow {
46        region_id: u64,
47        flow_ids: Vec<u64>,
48        source: BoxedError,
49        #[snafu(implicit)]
50        location: Location,
51    },
52
53    #[snafu(display("Flow engine is still recovering"))]
54    FlowNotRecovered {
55        #[snafu(implicit)]
56        location: Location,
57    },
58
59    #[snafu(display("Error encountered while creating flow: {sql}"))]
60    CreateFlow {
61        sql: String,
62        source: BoxedError,
63        #[snafu(implicit)]
64        location: Location,
65    },
66
67    #[snafu(display("Error encountered while creating sink table for flow: {create:?}"))]
68    CreateSinkTable {
69        create: CreateTableExpr,
70        source: BoxedError,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Time error"))]
76    Time {
77        source: common_time::error::Error,
78        #[snafu(implicit)]
79        location: Location,
80    },
81
82    #[snafu(display("No available frontend found after timeout: {timeout:?}, context: {context}"))]
83    NoAvailableFrontend {
84        timeout: std::time::Duration,
85        context: String,
86        #[snafu(implicit)]
87        location: Location,
88    },
89
90    #[snafu(display("External error"))]
91    External {
92        source: BoxedError,
93        #[snafu(implicit)]
94        location: Location,
95    },
96
97    #[snafu(display("Internal error"))]
98    Internal {
99        reason: String,
100        #[snafu(implicit)]
101        location: Location,
102    },
103
104    /// TODO(discord9): add detailed location of column
105    #[snafu(display("Failed to eval stream"))]
106    Eval {
107        source: EvalError,
108        #[snafu(implicit)]
109        location: Location,
110    },
111
112    #[snafu(display("Table not found: {name}"))]
113    TableNotFound {
114        name: String,
115        #[snafu(implicit)]
116        location: Location,
117    },
118
119    #[snafu(display("Table not found: {msg}, meta error: {source}"))]
120    TableNotFoundMeta {
121        source: common_meta::error::Error,
122        msg: String,
123        #[snafu(implicit)]
124        location: Location,
125    },
126
127    #[snafu(display("Flow not found, id={id}"))]
128    FlowNotFound {
129        id: FlowId,
130        #[snafu(implicit)]
131        location: Location,
132    },
133
134    #[snafu(display("Failed to list flows in flownode={id:?}"))]
135    ListFlows {
136        id: Option<common_meta::FlownodeId>,
137        source: common_meta::error::Error,
138        #[snafu(implicit)]
139        location: Location,
140    },
141
142    #[snafu(display("Flow already exist, id={id}"))]
143    FlowAlreadyExist {
144        id: FlowId,
145        #[snafu(implicit)]
146        location: Location,
147    },
148
149    #[snafu(display("Failed to join task"))]
150    JoinTask {
151        #[snafu(source)]
152        error: tokio::task::JoinError,
153        #[snafu(implicit)]
154        location: Location,
155    },
156
157    #[snafu(display("Invalid query: {reason}"))]
158    InvalidQuery {
159        reason: String,
160        #[snafu(implicit)]
161        location: Location,
162    },
163
164    #[snafu(display("Not implement in flow: {reason}"))]
165    NotImplemented {
166        reason: String,
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Flow plan error: {reason}"))]
172    Plan {
173        reason: String,
174        #[snafu(implicit)]
175        location: Location,
176    },
177
178    #[snafu(display("Unsupported: {reason}"))]
179    Unsupported {
180        reason: String,
181        #[snafu(implicit)]
182        location: Location,
183    },
184
185    #[snafu(display("Unsupported temporal filter: {reason}"))]
186    UnsupportedTemporalFilter {
187        reason: String,
188        #[snafu(implicit)]
189        location: Location,
190    },
191
192    #[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
193    Datatypes {
194        source: datatypes::Error,
195        extra: String,
196        #[snafu(implicit)]
197        location: Location,
198    },
199
200    #[snafu(display("Arrow error: {raw:?} in context: {context}"))]
201    Arrow {
202        #[snafu(source)]
203        raw: ArrowError,
204        context: String,
205        #[snafu(implicit)]
206        location: Location,
207    },
208
209    #[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
210    Datafusion {
211        #[snafu(source)]
212        raw: datafusion_common::DataFusionError,
213        context: String,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Unexpected: {reason}"))]
219    Unexpected {
220        reason: String,
221        #[snafu(implicit)]
222        location: Location,
223    },
224
225    #[snafu(display("Illegal check task state: {reason}"))]
226    IllegalCheckTaskState {
227        reason: String,
228        #[snafu(implicit)]
229        location: Location,
230    },
231
232    #[snafu(display(
233        "Failed to sync with check task for flow {} with allow_drop={}",
234        flow_id,
235        allow_drop
236    ))]
237    SyncCheckTask {
238        flow_id: FlowId,
239        allow_drop: bool,
240        #[snafu(implicit)]
241        location: Location,
242    },
243
244    #[snafu(display("Failed to start server"))]
245    StartServer {
246        #[snafu(implicit)]
247        location: Location,
248        source: servers::error::Error,
249    },
250
251    #[snafu(display("Failed to shutdown server"))]
252    ShutdownServer {
253        #[snafu(implicit)]
254        location: Location,
255        source: servers::error::Error,
256    },
257
258    #[snafu(display("Failed to initialize meta client"))]
259    MetaClientInit {
260        #[snafu(implicit)]
261        location: Location,
262        source: meta_client::error::Error,
263    },
264
265    #[snafu(display("Failed to parse address {}", addr))]
266    ParseAddr {
267        addr: String,
268        #[snafu(source)]
269        error: std::net::AddrParseError,
270    },
271
272    #[snafu(display("Failed to get cache from cache registry: {}", name))]
273    CacheRequired {
274        #[snafu(implicit)]
275        location: Location,
276        name: String,
277    },
278
279    #[snafu(display("Invalid request: {context}"))]
280    InvalidRequest {
281        context: String,
282        source: client::Error,
283        #[snafu(implicit)]
284        location: Location,
285    },
286
287    #[snafu(display("Failed to encode logical plan in substrait"))]
288    SubstraitEncodeLogicalPlan {
289        #[snafu(implicit)]
290        location: Location,
291        source: substrait::error::Error,
292    },
293
294    #[snafu(display("Failed to convert column schema to proto column def"))]
295    ConvertColumnSchema {
296        #[snafu(implicit)]
297        location: Location,
298        source: operator::error::Error,
299    },
300
301    #[snafu(display("Failed to create channel manager for gRPC client"))]
302    InvalidClientConfig {
303        #[snafu(implicit)]
304        location: Location,
305        source: common_grpc::error::Error,
306    },
307}
308
309/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
310pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
311    let msg = err.to_string();
312    let last_err_msg = common_error::ext::StackError::last(&err).to_string();
313    let code = err.status_code() as u32;
314    let mut header = from_err_code_msg_to_header(code, &last_err_msg);
315    header.insert(
316        GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
317        HeaderValue::from_static(err.retry_hint().as_str()),
318    );
319
320    tonic::Status::with_metadata(
321        tonic::Code::InvalidArgument,
322        msg,
323        MetadataMap::from_headers(header),
324    )
325}
326
327/// Result type for flow module
328pub type Result<T> = std::result::Result<T, Error>;
329
330impl ErrorExt for Error {
331    fn status_code(&self) -> StatusCode {
332        match self {
333            Self::Eval { .. }
334            | Self::JoinTask { .. }
335            | Self::Datafusion { .. }
336            | Self::InsertIntoFlow { .. }
337            | Self::NoAvailableFrontend { .. }
338            | Self::FlowNotRecovered { .. } => StatusCode::Internal,
339            Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
340            Self::TableNotFound { .. }
341            | Self::TableNotFoundMeta { .. }
342            | Self::ListFlows { .. } => StatusCode::TableNotFound,
343            Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
344            Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
345            Self::CreateFlow { .. }
346            | Self::CreateSinkTable { .. }
347            | Self::Arrow { .. }
348            | Self::Time { .. } => StatusCode::EngineExecuteQuery,
349            Self::Unexpected { .. }
350            | Self::SyncCheckTask { .. }
351            | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
352            Self::NotImplemented { .. }
353            | Self::UnsupportedTemporalFilter { .. }
354            | Self::Unsupported { .. } => StatusCode::Unsupported,
355            Self::External { source, .. } => source.status_code(),
356            Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
357            Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
358                source.status_code()
359            }
360            Self::MetaClientInit { source, .. } => source.status_code(),
361
362            Self::InvalidQuery { .. }
363            | Self::InvalidRequest { .. }
364            | Self::ParseAddr { .. }
365            | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
366
367            Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
368
369            Error::ConvertColumnSchema { source, .. } => source.status_code(),
370        }
371    }
372
373    fn as_any(&self) -> &dyn Any {
374        self
375    }
376
377    fn retry_hint(&self) -> RetryHint {
378        match self {
379            Self::FlowNotRecovered { .. } | Self::NoAvailableFrontend { .. } => {
380                RetryHint::Retryable
381            }
382
383            Self::InsertIntoFlow { source, .. }
384            | Self::CreateFlow { source, .. }
385            | Self::CreateSinkTable { source, .. }
386            | Self::External { source, .. } => source.retry_hint(),
387
388            Self::Time { source, .. } => source.retry_hint(),
389            Self::TableNotFoundMeta { source, .. } | Self::ListFlows { source, .. } => {
390                source.retry_hint()
391            }
392            Self::Datatypes { source, .. } => source.retry_hint(),
393            Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
394                source.retry_hint()
395            }
396            Self::MetaClientInit { source, .. } => source.retry_hint(),
397            Self::InvalidRequest { source, .. } => source.retry_hint(),
398            Self::SubstraitEncodeLogicalPlan { source, .. } => source.retry_hint(),
399            Self::ConvertColumnSchema { source, .. } => source.retry_hint(),
400            Self::InvalidClientConfig { source, .. } => source.retry_hint(),
401
402            _ => RetryHint::NonRetryable,
403        }
404    }
405}
406
407define_into_tonic_status!(Error);
408
409impl From<EvalError> for Error {
410    fn from(e: EvalError) -> Self {
411        Err::<(), _>(e).context(EvalSnafu).unwrap_err()
412    }
413}