common_query/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33    UnsupportedInputDataType {
34        function: String,
35        datatypes: Vec<ConcreteDataType>,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Failed to cast scalar value into vector"))]
41    FromScalarValue {
42        #[snafu(implicit)]
43        location: Location,
44        source: DataTypeError,
45    },
46
47    #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
48    IntoVector {
49        #[snafu(implicit)]
50        location: Location,
51        source: DataTypeError,
52        data_type: ArrowDatatype,
53    },
54
55    #[snafu(display("Invalid input type: {}", err_msg))]
56    InvalidInputType {
57        #[snafu(implicit)]
58        location: Location,
59        source: DataTypeError,
60        err_msg: String,
61    },
62
63    #[snafu(display(
64        "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
65    ))]
66    InvalidInputState {
67        #[snafu(implicit)]
68        location: Location,
69    },
70
71    #[snafu(transparent)]
72    GeneralDataFusion {
73        #[snafu(source)]
74        error: DataFusionError,
75        #[snafu(implicit)]
76        location: Location,
77    },
78
79    #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
80    ConvertDfRecordBatchStream {
81        #[snafu(implicit)]
82        location: Location,
83        source: common_recordbatch::error::Error,
84    },
85
86    #[snafu(display("Failed to cast array to {:?}", typ))]
87    TypeCast {
88        #[snafu(source)]
89        error: ArrowError,
90        typ: arrow::datatypes::DataType,
91        #[snafu(implicit)]
92        location: Location,
93    },
94
95    #[snafu(display("Failed to perform compute operation on arrow arrays"))]
96    ArrowCompute {
97        #[snafu(source)]
98        error: ArrowError,
99        #[snafu(implicit)]
100        location: Location,
101    },
102
103    #[snafu(display("Query engine fail to cast value"))]
104    ToScalarValue {
105        #[snafu(implicit)]
106        location: Location,
107        source: DataTypeError,
108    },
109
110    #[snafu(display("Failed to get scalar vector"))]
111    GetScalarVector {
112        #[snafu(implicit)]
113        location: Location,
114        source: DataTypeError,
115    },
116
117    #[snafu(display("Failed to execute function: {source}"))]
118    Execute {
119        #[snafu(implicit)]
120        location: Location,
121        source: BoxedError,
122    },
123
124    #[snafu(display("Failed to decode logical plan: {source}"))]
125    DecodePlan {
126        #[snafu(implicit)]
127        location: Location,
128        source: BoxedError,
129    },
130
131    #[snafu(display("Failed to do table mutation"))]
132    TableMutation {
133        source: BoxedError,
134        #[snafu(implicit)]
135        location: Location,
136    },
137
138    #[snafu(display("Failed to do procedure task"))]
139    ProcedureService {
140        source: BoxedError,
141        #[snafu(implicit)]
142        location: Location,
143    },
144
145    #[snafu(display("Missing TableMutationHandler, not expected"))]
146    MissingTableMutationHandler {
147        #[snafu(implicit)]
148        location: Location,
149    },
150
151    #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
152    MissingProcedureServiceHandler {
153        #[snafu(implicit)]
154        location: Location,
155    },
156
157    #[snafu(display("Missing FlowServiceHandler, not expected"))]
158    MissingFlowServiceHandler {
159        #[snafu(implicit)]
160        location: Location,
161    },
162
163    #[snafu(display("Invalid function args: {}", err_msg))]
164    InvalidFuncArgs {
165        err_msg: String,
166        #[snafu(implicit)]
167        location: Location,
168    },
169
170    #[snafu(display("Permission denied: {}", err_msg))]
171    PermissionDenied {
172        err_msg: String,
173        #[snafu(implicit)]
174        location: Location,
175    },
176
177    #[snafu(display("Can't found alive flownode"))]
178    FlownodeNotFound {
179        #[snafu(implicit)]
180        location: Location,
181    },
182
183    #[snafu(display("Invalid vector string: {}", vec_str))]
184    InvalidVectorString {
185        vec_str: String,
186        source: DataTypeError,
187        #[snafu(implicit)]
188        location: Location,
189    },
190
191    #[snafu(display("Failed to register UDF: {}", name))]
192    RegisterUdf {
193        name: String,
194        #[snafu(source)]
195        error: DataFusionError,
196        #[snafu(implicit)]
197        location: Location,
198    },
199
200    #[snafu(display("Invalid character in prefix config: {}", prefix))]
201    InvalidColumnPrefix { prefix: String },
202}
203
204pub type Result<T> = std::result::Result<T, Error>;
205
206impl ErrorExt for Error {
207    fn status_code(&self) -> StatusCode {
208        match self {
209            Error::InvalidInputState { .. }
210            | Error::ToScalarValue { .. }
211            | Error::GetScalarVector { .. }
212            | Error::ArrowCompute { .. }
213            | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
214
215            Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
216
217            Error::InvalidInputType { source, .. }
218            | Error::IntoVector { source, .. }
219            | Error::FromScalarValue { source, .. }
220            | Error::InvalidVectorString { source, .. } => source.status_code(),
221
222            Error::MissingTableMutationHandler { .. }
223            | Error::MissingProcedureServiceHandler { .. }
224            | Error::MissingFlowServiceHandler { .. }
225            | Error::RegisterUdf { .. } => StatusCode::Unexpected,
226
227            Error::UnsupportedInputDataType { .. }
228            | Error::TypeCast { .. }
229            | Error::InvalidFuncArgs { .. }
230            | Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
231
232            Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
233
234            Error::DecodePlan { source, .. }
235            | Error::Execute { source, .. }
236            | Error::ProcedureService { source, .. }
237            | Error::TableMutation { source, .. } => source.status_code(),
238
239            Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
240        }
241    }
242
243    fn as_any(&self) -> &dyn Any {
244        self
245    }
246}
247
248impl From<Error> for DataFusionError {
249    fn from(e: Error) -> DataFusionError {
250        DataFusionError::External(Box::new(e))
251    }
252}
253
254/// Try to get the proper [`StatusCode`] of [`DataFusionError].
255pub fn datafusion_status_code<T: ErrorExt + 'static>(
256    e: &DataFusionError,
257    default_status: Option<StatusCode>,
258) -> StatusCode {
259    match e {
260        DataFusionError::Internal(_) => StatusCode::Internal,
261        DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
262        DataFusionError::Plan(_) => StatusCode::PlanQuery,
263        DataFusionError::External(e) => {
264            if let Some(ext) = (*e).downcast_ref::<T>() {
265                ext.status_code()
266            } else {
267                default_status.unwrap_or(StatusCode::EngineExecuteQuery)
268            }
269        }
270        DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
271        _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
272    }
273}