common_query/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33    UnsupportedInputDataType {
34        function: String,
35        datatypes: Vec<ConcreteDataType>,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Failed to cast scalar value into vector"))]
41    FromScalarValue {
42        #[snafu(implicit)]
43        location: Location,
44        source: DataTypeError,
45    },
46
47    #[snafu(display("Failed to cast arrow array into vector"))]
48    FromArrowArray {
49        #[snafu(implicit)]
50        location: Location,
51        source: DataTypeError,
52    },
53
54    #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55    IntoVector {
56        #[snafu(implicit)]
57        location: Location,
58        source: DataTypeError,
59        data_type: ArrowDatatype,
60    },
61
62    #[snafu(display("Failed to create accumulator: {}", err_msg))]
63    CreateAccumulator { err_msg: String },
64
65    #[snafu(display("Failed to downcast vector: {}", err_msg))]
66    DowncastVector { err_msg: String },
67
68    #[snafu(display("Bad accumulator implementation: {}", err_msg))]
69    BadAccumulatorImpl {
70        err_msg: String,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Invalid input type: {}", err_msg))]
76    InvalidInputType {
77        #[snafu(implicit)]
78        location: Location,
79        source: DataTypeError,
80        err_msg: String,
81    },
82
83    #[snafu(display(
84        "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
85    ))]
86    InvalidInputState {
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("General DataFusion error"))]
92    GeneralDataFusion {
93        #[snafu(source)]
94        error: DataFusionError,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
100    ConvertDfRecordBatchStream {
101        #[snafu(implicit)]
102        location: Location,
103        source: common_recordbatch::error::Error,
104    },
105
106    #[snafu(display("Failed to convert arrow schema"))]
107    ConvertArrowSchema {
108        #[snafu(implicit)]
109        location: Location,
110        source: DataTypeError,
111    },
112
113    #[snafu(display("Failed to cast array to {:?}", typ))]
114    TypeCast {
115        #[snafu(source)]
116        error: ArrowError,
117        typ: arrow::datatypes::DataType,
118        #[snafu(implicit)]
119        location: Location,
120    },
121
122    #[snafu(display("Failed to perform compute operation on arrow arrays"))]
123    ArrowCompute {
124        #[snafu(source)]
125        error: ArrowError,
126        #[snafu(implicit)]
127        location: Location,
128    },
129
130    #[snafu(display("Query engine fail to cast value"))]
131    ToScalarValue {
132        #[snafu(implicit)]
133        location: Location,
134        source: DataTypeError,
135    },
136
137    #[snafu(display("Failed to get scalar vector"))]
138    GetScalarVector {
139        #[snafu(implicit)]
140        location: Location,
141        source: DataTypeError,
142    },
143
144    #[snafu(display("Failed to execute function: {source}"))]
145    Execute {
146        #[snafu(implicit)]
147        location: Location,
148        source: BoxedError,
149    },
150
151    #[snafu(display("Failed to decode logical plan: {source}"))]
152    DecodePlan {
153        #[snafu(implicit)]
154        location: Location,
155        source: BoxedError,
156    },
157
158    #[snafu(display("Failed to do table mutation"))]
159    TableMutation {
160        source: BoxedError,
161        #[snafu(implicit)]
162        location: Location,
163    },
164
165    #[snafu(display("Failed to do procedure task"))]
166    ProcedureService {
167        source: BoxedError,
168        #[snafu(implicit)]
169        location: Location,
170    },
171
172    #[snafu(display("Missing TableMutationHandler, not expected"))]
173    MissingTableMutationHandler {
174        #[snafu(implicit)]
175        location: Location,
176    },
177
178    #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
179    MissingProcedureServiceHandler {
180        #[snafu(implicit)]
181        location: Location,
182    },
183
184    #[snafu(display("Missing FlowServiceHandler, not expected"))]
185    MissingFlowServiceHandler {
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Invalid function args: {}", err_msg))]
191    InvalidFuncArgs {
192        err_msg: String,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Permission denied: {}", err_msg))]
198    PermissionDenied {
199        err_msg: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Can't found alive flownode"))]
205    FlownodeNotFound {
206        #[snafu(implicit)]
207        location: Location,
208    },
209
210    #[snafu(display("Invalid vector string: {}", vec_str))]
211    InvalidVectorString {
212        vec_str: String,
213        source: DataTypeError,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Failed to register UDF: {}", name))]
219    RegisterUdf {
220        name: String,
221        #[snafu(source)]
222        error: DataFusionError,
223        #[snafu(implicit)]
224        location: Location,
225    },
226}
227
228pub type Result<T> = std::result::Result<T, Error>;
229
230impl ErrorExt for Error {
231    fn status_code(&self) -> StatusCode {
232        match self {
233            Error::CreateAccumulator { .. }
234            | Error::DowncastVector { .. }
235            | Error::InvalidInputState { .. }
236            | Error::BadAccumulatorImpl { .. }
237            | Error::ToScalarValue { .. }
238            | Error::GetScalarVector { .. }
239            | Error::ArrowCompute { .. }
240            | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
241
242            Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
243
244            Error::InvalidInputType { source, .. }
245            | Error::IntoVector { source, .. }
246            | Error::FromScalarValue { source, .. }
247            | Error::ConvertArrowSchema { source, .. }
248            | Error::FromArrowArray { source, .. }
249            | Error::InvalidVectorString { source, .. } => source.status_code(),
250
251            Error::MissingTableMutationHandler { .. }
252            | Error::MissingProcedureServiceHandler { .. }
253            | Error::MissingFlowServiceHandler { .. }
254            | Error::RegisterUdf { .. } => StatusCode::Unexpected,
255
256            Error::UnsupportedInputDataType { .. }
257            | Error::TypeCast { .. }
258            | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
259
260            Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
261
262            Error::DecodePlan { source, .. }
263            | Error::Execute { source, .. }
264            | Error::ProcedureService { source, .. }
265            | Error::TableMutation { source, .. } => source.status_code(),
266
267            Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
268        }
269    }
270
271    fn as_any(&self) -> &dyn Any {
272        self
273    }
274}
275
276impl From<Error> for DataFusionError {
277    fn from(e: Error) -> DataFusionError {
278        DataFusionError::External(Box::new(e))
279    }
280}
281
282/// Try to get the proper [`StatusCode`] of [`DataFusionError].
283pub fn datafusion_status_code<T: ErrorExt + 'static>(
284    e: &DataFusionError,
285    default_status: Option<StatusCode>,
286) -> StatusCode {
287    match e {
288        DataFusionError::Internal(_) => StatusCode::Internal,
289        DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
290        DataFusionError::Plan(_) => StatusCode::PlanQuery,
291        DataFusionError::External(e) => {
292            if let Some(ext) = (*e).downcast_ref::<T>() {
293                ext.status_code()
294            } else {
295                default_status.unwrap_or(StatusCode::EngineExecuteQuery)
296            }
297        }
298        DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
299        _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
300    }
301}