common_query/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33    UnsupportedInputDataType {
34        function: String,
35        datatypes: Vec<ConcreteDataType>,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Failed to cast scalar value into vector"))]
41    FromScalarValue {
42        #[snafu(implicit)]
43        location: Location,
44        source: DataTypeError,
45    },
46
47    #[snafu(display("Failed to cast arrow array into vector"))]
48    FromArrowArray {
49        #[snafu(implicit)]
50        location: Location,
51        source: DataTypeError,
52    },
53
54    #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55    IntoVector {
56        #[snafu(implicit)]
57        location: Location,
58        source: DataTypeError,
59        data_type: ArrowDatatype,
60    },
61
62    #[snafu(display("Failed to downcast vector: {}", err_msg))]
63    DowncastVector { err_msg: String },
64
65    #[snafu(display("Invalid input type: {}", err_msg))]
66    InvalidInputType {
67        #[snafu(implicit)]
68        location: Location,
69        source: DataTypeError,
70        err_msg: String,
71    },
72
73    #[snafu(display(
74        "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
75    ))]
76    InvalidInputState {
77        #[snafu(implicit)]
78        location: Location,
79    },
80
81    #[snafu(display("General DataFusion error"))]
82    GeneralDataFusion {
83        #[snafu(source)]
84        error: DataFusionError,
85        #[snafu(implicit)]
86        location: Location,
87    },
88
89    #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
90    ConvertDfRecordBatchStream {
91        #[snafu(implicit)]
92        location: Location,
93        source: common_recordbatch::error::Error,
94    },
95
96    #[snafu(display("Failed to cast array to {:?}", typ))]
97    TypeCast {
98        #[snafu(source)]
99        error: ArrowError,
100        typ: arrow::datatypes::DataType,
101        #[snafu(implicit)]
102        location: Location,
103    },
104
105    #[snafu(display("Failed to perform compute operation on arrow arrays"))]
106    ArrowCompute {
107        #[snafu(source)]
108        error: ArrowError,
109        #[snafu(implicit)]
110        location: Location,
111    },
112
113    #[snafu(display("Query engine fail to cast value"))]
114    ToScalarValue {
115        #[snafu(implicit)]
116        location: Location,
117        source: DataTypeError,
118    },
119
120    #[snafu(display("Failed to get scalar vector"))]
121    GetScalarVector {
122        #[snafu(implicit)]
123        location: Location,
124        source: DataTypeError,
125    },
126
127    #[snafu(display("Failed to execute function: {source}"))]
128    Execute {
129        #[snafu(implicit)]
130        location: Location,
131        source: BoxedError,
132    },
133
134    #[snafu(display("Failed to decode logical plan: {source}"))]
135    DecodePlan {
136        #[snafu(implicit)]
137        location: Location,
138        source: BoxedError,
139    },
140
141    #[snafu(display("Failed to do table mutation"))]
142    TableMutation {
143        source: BoxedError,
144        #[snafu(implicit)]
145        location: Location,
146    },
147
148    #[snafu(display("Failed to do procedure task"))]
149    ProcedureService {
150        source: BoxedError,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Missing TableMutationHandler, not expected"))]
156    MissingTableMutationHandler {
157        #[snafu(implicit)]
158        location: Location,
159    },
160
161    #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
162    MissingProcedureServiceHandler {
163        #[snafu(implicit)]
164        location: Location,
165    },
166
167    #[snafu(display("Missing FlowServiceHandler, not expected"))]
168    MissingFlowServiceHandler {
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Invalid function args: {}", err_msg))]
174    InvalidFuncArgs {
175        err_msg: String,
176        #[snafu(implicit)]
177        location: Location,
178    },
179
180    #[snafu(display("Permission denied: {}", err_msg))]
181    PermissionDenied {
182        err_msg: String,
183        #[snafu(implicit)]
184        location: Location,
185    },
186
187    #[snafu(display("Can't found alive flownode"))]
188    FlownodeNotFound {
189        #[snafu(implicit)]
190        location: Location,
191    },
192
193    #[snafu(display("Invalid vector string: {}", vec_str))]
194    InvalidVectorString {
195        vec_str: String,
196        source: DataTypeError,
197        #[snafu(implicit)]
198        location: Location,
199    },
200
201    #[snafu(display("Failed to register UDF: {}", name))]
202    RegisterUdf {
203        name: String,
204        #[snafu(source)]
205        error: DataFusionError,
206        #[snafu(implicit)]
207        location: Location,
208    },
209}
210
211pub type Result<T> = std::result::Result<T, Error>;
212
213impl ErrorExt for Error {
214    fn status_code(&self) -> StatusCode {
215        match self {
216            Error::DowncastVector { .. }
217            | Error::InvalidInputState { .. }
218            | Error::ToScalarValue { .. }
219            | Error::GetScalarVector { .. }
220            | Error::ArrowCompute { .. }
221            | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
222
223            Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
224
225            Error::InvalidInputType { source, .. }
226            | Error::IntoVector { source, .. }
227            | Error::FromScalarValue { source, .. }
228            | Error::FromArrowArray { source, .. }
229            | Error::InvalidVectorString { source, .. } => source.status_code(),
230
231            Error::MissingTableMutationHandler { .. }
232            | Error::MissingProcedureServiceHandler { .. }
233            | Error::MissingFlowServiceHandler { .. }
234            | Error::RegisterUdf { .. } => StatusCode::Unexpected,
235
236            Error::UnsupportedInputDataType { .. }
237            | Error::TypeCast { .. }
238            | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
239
240            Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
241
242            Error::DecodePlan { source, .. }
243            | Error::Execute { source, .. }
244            | Error::ProcedureService { source, .. }
245            | Error::TableMutation { source, .. } => source.status_code(),
246
247            Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
248        }
249    }
250
251    fn as_any(&self) -> &dyn Any {
252        self
253    }
254}
255
256impl From<Error> for DataFusionError {
257    fn from(e: Error) -> DataFusionError {
258        DataFusionError::External(Box::new(e))
259    }
260}
261
262/// Try to get the proper [`StatusCode`] of [`DataFusionError].
263pub fn datafusion_status_code<T: ErrorExt + 'static>(
264    e: &DataFusionError,
265    default_status: Option<StatusCode>,
266) -> StatusCode {
267    match e {
268        DataFusionError::Internal(_) => StatusCode::Internal,
269        DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
270        DataFusionError::Plan(_) => StatusCode::PlanQuery,
271        DataFusionError::External(e) => {
272            if let Some(ext) = (*e).downcast_ref::<T>() {
273                ext.status_code()
274            } else {
275                default_status.unwrap_or(StatusCode::EngineExecuteQuery)
276            }
277        }
278        DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
279        _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
280    }
281}