common_query/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33    UnsupportedInputDataType {
34        function: String,
35        datatypes: Vec<ConcreteDataType>,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Failed to cast scalar value into vector"))]
41    FromScalarValue {
42        #[snafu(implicit)]
43        location: Location,
44        source: DataTypeError,
45    },
46
47    #[snafu(display("Failed to cast arrow array into vector"))]
48    FromArrowArray {
49        #[snafu(implicit)]
50        location: Location,
51        source: DataTypeError,
52    },
53
54    #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55    IntoVector {
56        #[snafu(implicit)]
57        location: Location,
58        source: DataTypeError,
59        data_type: ArrowDatatype,
60    },
61
62    #[snafu(display("Failed to create accumulator: {}", err_msg))]
63    CreateAccumulator { err_msg: String },
64
65    #[snafu(display("Failed to downcast vector: {}", err_msg))]
66    DowncastVector { err_msg: String },
67
68    #[snafu(display("Bad accumulator implementation: {}", err_msg))]
69    BadAccumulatorImpl {
70        err_msg: String,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Invalid input type: {}", err_msg))]
76    InvalidInputType {
77        #[snafu(implicit)]
78        location: Location,
79        source: DataTypeError,
80        err_msg: String,
81    },
82
83    #[snafu(display(
84        "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
85    ))]
86    InvalidInputState {
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("General DataFusion error"))]
92    GeneralDataFusion {
93        #[snafu(source)]
94        error: DataFusionError,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
100    ConvertDfRecordBatchStream {
101        #[snafu(implicit)]
102        location: Location,
103        source: common_recordbatch::error::Error,
104    },
105
106    #[snafu(display("Failed to cast array to {:?}", typ))]
107    TypeCast {
108        #[snafu(source)]
109        error: ArrowError,
110        typ: arrow::datatypes::DataType,
111        #[snafu(implicit)]
112        location: Location,
113    },
114
115    #[snafu(display("Failed to perform compute operation on arrow arrays"))]
116    ArrowCompute {
117        #[snafu(source)]
118        error: ArrowError,
119        #[snafu(implicit)]
120        location: Location,
121    },
122
123    #[snafu(display("Query engine fail to cast value"))]
124    ToScalarValue {
125        #[snafu(implicit)]
126        location: Location,
127        source: DataTypeError,
128    },
129
130    #[snafu(display("Failed to get scalar vector"))]
131    GetScalarVector {
132        #[snafu(implicit)]
133        location: Location,
134        source: DataTypeError,
135    },
136
137    #[snafu(display("Failed to execute function: {source}"))]
138    Execute {
139        #[snafu(implicit)]
140        location: Location,
141        source: BoxedError,
142    },
143
144    #[snafu(display("Failed to decode logical plan: {source}"))]
145    DecodePlan {
146        #[snafu(implicit)]
147        location: Location,
148        source: BoxedError,
149    },
150
151    #[snafu(display("Failed to do table mutation"))]
152    TableMutation {
153        source: BoxedError,
154        #[snafu(implicit)]
155        location: Location,
156    },
157
158    #[snafu(display("Failed to do procedure task"))]
159    ProcedureService {
160        source: BoxedError,
161        #[snafu(implicit)]
162        location: Location,
163    },
164
165    #[snafu(display("Missing TableMutationHandler, not expected"))]
166    MissingTableMutationHandler {
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
172    MissingProcedureServiceHandler {
173        #[snafu(implicit)]
174        location: Location,
175    },
176
177    #[snafu(display("Missing FlowServiceHandler, not expected"))]
178    MissingFlowServiceHandler {
179        #[snafu(implicit)]
180        location: Location,
181    },
182
183    #[snafu(display("Invalid function args: {}", err_msg))]
184    InvalidFuncArgs {
185        err_msg: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Permission denied: {}", err_msg))]
191    PermissionDenied {
192        err_msg: String,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Can't found alive flownode"))]
198    FlownodeNotFound {
199        #[snafu(implicit)]
200        location: Location,
201    },
202
203    #[snafu(display("Invalid vector string: {}", vec_str))]
204    InvalidVectorString {
205        vec_str: String,
206        source: DataTypeError,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to register UDF: {}", name))]
212    RegisterUdf {
213        name: String,
214        #[snafu(source)]
215        error: DataFusionError,
216        #[snafu(implicit)]
217        location: Location,
218    },
219}
220
221pub type Result<T> = std::result::Result<T, Error>;
222
223impl ErrorExt for Error {
224    fn status_code(&self) -> StatusCode {
225        match self {
226            Error::CreateAccumulator { .. }
227            | Error::DowncastVector { .. }
228            | Error::InvalidInputState { .. }
229            | Error::BadAccumulatorImpl { .. }
230            | Error::ToScalarValue { .. }
231            | Error::GetScalarVector { .. }
232            | Error::ArrowCompute { .. }
233            | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
234
235            Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
236
237            Error::InvalidInputType { source, .. }
238            | Error::IntoVector { source, .. }
239            | Error::FromScalarValue { source, .. }
240            | Error::FromArrowArray { source, .. }
241            | Error::InvalidVectorString { source, .. } => source.status_code(),
242
243            Error::MissingTableMutationHandler { .. }
244            | Error::MissingProcedureServiceHandler { .. }
245            | Error::MissingFlowServiceHandler { .. }
246            | Error::RegisterUdf { .. } => StatusCode::Unexpected,
247
248            Error::UnsupportedInputDataType { .. }
249            | Error::TypeCast { .. }
250            | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
251
252            Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
253
254            Error::DecodePlan { source, .. }
255            | Error::Execute { source, .. }
256            | Error::ProcedureService { source, .. }
257            | Error::TableMutation { source, .. } => source.status_code(),
258
259            Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
260        }
261    }
262
263    fn as_any(&self) -> &dyn Any {
264        self
265    }
266}
267
268impl From<Error> for DataFusionError {
269    fn from(e: Error) -> DataFusionError {
270        DataFusionError::External(Box::new(e))
271    }
272}
273
274/// Try to get the proper [`StatusCode`] of [`DataFusionError].
275pub fn datafusion_status_code<T: ErrorExt + 'static>(
276    e: &DataFusionError,
277    default_status: Option<StatusCode>,
278) -> StatusCode {
279    match e {
280        DataFusionError::Internal(_) => StatusCode::Internal,
281        DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
282        DataFusionError::Plan(_) => StatusCode::PlanQuery,
283        DataFusionError::External(e) => {
284            if let Some(ext) = (*e).downcast_ref::<T>() {
285                ext.status_code()
286            } else {
287                default_status.unwrap_or(StatusCode::EngineExecuteQuery)
288            }
289        }
290        DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
291        _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
292    }
293}