common_query/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33    UnsupportedInputDataType {
34        function: String,
35        datatypes: Vec<ConcreteDataType>,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Failed to cast scalar value into vector"))]
41    FromScalarValue {
42        #[snafu(implicit)]
43        location: Location,
44        source: DataTypeError,
45    },
46
47    #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
48    IntoVector {
49        #[snafu(implicit)]
50        location: Location,
51        source: DataTypeError,
52        data_type: ArrowDatatype,
53    },
54
55    #[snafu(display("Failed to downcast vector: {}", err_msg))]
56    DowncastVector { err_msg: String },
57
58    #[snafu(display("Invalid input type: {}", err_msg))]
59    InvalidInputType {
60        #[snafu(implicit)]
61        location: Location,
62        source: DataTypeError,
63        err_msg: String,
64    },
65
66    #[snafu(display(
67        "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
68    ))]
69    InvalidInputState {
70        #[snafu(implicit)]
71        location: Location,
72    },
73
74    #[snafu(transparent)]
75    GeneralDataFusion {
76        #[snafu(source)]
77        error: DataFusionError,
78        #[snafu(implicit)]
79        location: Location,
80    },
81
82    #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
83    ConvertDfRecordBatchStream {
84        #[snafu(implicit)]
85        location: Location,
86        source: common_recordbatch::error::Error,
87    },
88
89    #[snafu(display("Failed to cast array to {:?}", typ))]
90    TypeCast {
91        #[snafu(source)]
92        error: ArrowError,
93        typ: arrow::datatypes::DataType,
94        #[snafu(implicit)]
95        location: Location,
96    },
97
98    #[snafu(display("Failed to perform compute operation on arrow arrays"))]
99    ArrowCompute {
100        #[snafu(source)]
101        error: ArrowError,
102        #[snafu(implicit)]
103        location: Location,
104    },
105
106    #[snafu(display("Query engine fail to cast value"))]
107    ToScalarValue {
108        #[snafu(implicit)]
109        location: Location,
110        source: DataTypeError,
111    },
112
113    #[snafu(display("Failed to get scalar vector"))]
114    GetScalarVector {
115        #[snafu(implicit)]
116        location: Location,
117        source: DataTypeError,
118    },
119
120    #[snafu(display("Failed to execute function: {source}"))]
121    Execute {
122        #[snafu(implicit)]
123        location: Location,
124        source: BoxedError,
125    },
126
127    #[snafu(display("Failed to decode logical plan: {source}"))]
128    DecodePlan {
129        #[snafu(implicit)]
130        location: Location,
131        source: BoxedError,
132    },
133
134    #[snafu(display("Failed to do table mutation"))]
135    TableMutation {
136        source: BoxedError,
137        #[snafu(implicit)]
138        location: Location,
139    },
140
141    #[snafu(display("Failed to do procedure task"))]
142    ProcedureService {
143        source: BoxedError,
144        #[snafu(implicit)]
145        location: Location,
146    },
147
148    #[snafu(display("Missing TableMutationHandler, not expected"))]
149    MissingTableMutationHandler {
150        #[snafu(implicit)]
151        location: Location,
152    },
153
154    #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
155    MissingProcedureServiceHandler {
156        #[snafu(implicit)]
157        location: Location,
158    },
159
160    #[snafu(display("Missing FlowServiceHandler, not expected"))]
161    MissingFlowServiceHandler {
162        #[snafu(implicit)]
163        location: Location,
164    },
165
166    #[snafu(display("Invalid function args: {}", err_msg))]
167    InvalidFuncArgs {
168        err_msg: String,
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Permission denied: {}", err_msg))]
174    PermissionDenied {
175        err_msg: String,
176        #[snafu(implicit)]
177        location: Location,
178    },
179
180    #[snafu(display("Can't found alive flownode"))]
181    FlownodeNotFound {
182        #[snafu(implicit)]
183        location: Location,
184    },
185
186    #[snafu(display("Invalid vector string: {}", vec_str))]
187    InvalidVectorString {
188        vec_str: String,
189        source: DataTypeError,
190        #[snafu(implicit)]
191        location: Location,
192    },
193
194    #[snafu(display("Failed to register UDF: {}", name))]
195    RegisterUdf {
196        name: String,
197        #[snafu(source)]
198        error: DataFusionError,
199        #[snafu(implicit)]
200        location: Location,
201    },
202}
203
204pub type Result<T> = std::result::Result<T, Error>;
205
206impl ErrorExt for Error {
207    fn status_code(&self) -> StatusCode {
208        match self {
209            Error::DowncastVector { .. }
210            | Error::InvalidInputState { .. }
211            | Error::ToScalarValue { .. }
212            | Error::GetScalarVector { .. }
213            | Error::ArrowCompute { .. }
214            | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
215
216            Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
217
218            Error::InvalidInputType { source, .. }
219            | Error::IntoVector { source, .. }
220            | Error::FromScalarValue { source, .. }
221            | Error::InvalidVectorString { source, .. } => source.status_code(),
222
223            Error::MissingTableMutationHandler { .. }
224            | Error::MissingProcedureServiceHandler { .. }
225            | Error::MissingFlowServiceHandler { .. }
226            | Error::RegisterUdf { .. } => StatusCode::Unexpected,
227
228            Error::UnsupportedInputDataType { .. }
229            | Error::TypeCast { .. }
230            | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
231
232            Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
233
234            Error::DecodePlan { source, .. }
235            | Error::Execute { source, .. }
236            | Error::ProcedureService { source, .. }
237            | Error::TableMutation { source, .. } => source.status_code(),
238
239            Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
240        }
241    }
242
243    fn as_any(&self) -> &dyn Any {
244        self
245    }
246}
247
248impl From<Error> for DataFusionError {
249    fn from(e: Error) -> DataFusionError {
250        DataFusionError::External(Box::new(e))
251    }
252}
253
254/// Try to get the proper [`StatusCode`] of [`DataFusionError].
255pub fn datafusion_status_code<T: ErrorExt + 'static>(
256    e: &DataFusionError,
257    default_status: Option<StatusCode>,
258) -> StatusCode {
259    match e {
260        DataFusionError::Internal(_) => StatusCode::Internal,
261        DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
262        DataFusionError::Plan(_) => StatusCode::PlanQuery,
263        DataFusionError::External(e) => {
264            if let Some(ext) = (*e).downcast_ref::<T>() {
265                ext.status_code()
266            } else {
267                default_status.unwrap_or(StatusCode::EngineExecuteQuery)
268            }
269        }
270        DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
271        _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
272    }
273}