1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector"))]
48 FromArrowArray {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 },
53
54 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55 IntoVector {
56 #[snafu(implicit)]
57 location: Location,
58 source: DataTypeError,
59 data_type: ArrowDatatype,
60 },
61
62 #[snafu(display("Failed to create accumulator: {}", err_msg))]
63 CreateAccumulator { err_msg: String },
64
65 #[snafu(display("Failed to downcast vector: {}", err_msg))]
66 DowncastVector { err_msg: String },
67
68 #[snafu(display("Bad accumulator implementation: {}", err_msg))]
69 BadAccumulatorImpl {
70 err_msg: String,
71 #[snafu(implicit)]
72 location: Location,
73 },
74
75 #[snafu(display("Invalid input type: {}", err_msg))]
76 InvalidInputType {
77 #[snafu(implicit)]
78 location: Location,
79 source: DataTypeError,
80 err_msg: String,
81 },
82
83 #[snafu(display(
84 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
85 ))]
86 InvalidInputState {
87 #[snafu(implicit)]
88 location: Location,
89 },
90
91 #[snafu(display("General DataFusion error"))]
92 GeneralDataFusion {
93 #[snafu(source)]
94 error: DataFusionError,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
100 ConvertDfRecordBatchStream {
101 #[snafu(implicit)]
102 location: Location,
103 source: common_recordbatch::error::Error,
104 },
105
106 #[snafu(display("Failed to convert arrow schema"))]
107 ConvertArrowSchema {
108 #[snafu(implicit)]
109 location: Location,
110 source: DataTypeError,
111 },
112
113 #[snafu(display("Failed to cast array to {:?}", typ))]
114 TypeCast {
115 #[snafu(source)]
116 error: ArrowError,
117 typ: arrow::datatypes::DataType,
118 #[snafu(implicit)]
119 location: Location,
120 },
121
122 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
123 ArrowCompute {
124 #[snafu(source)]
125 error: ArrowError,
126 #[snafu(implicit)]
127 location: Location,
128 },
129
130 #[snafu(display("Query engine fail to cast value"))]
131 ToScalarValue {
132 #[snafu(implicit)]
133 location: Location,
134 source: DataTypeError,
135 },
136
137 #[snafu(display("Failed to get scalar vector"))]
138 GetScalarVector {
139 #[snafu(implicit)]
140 location: Location,
141 source: DataTypeError,
142 },
143
144 #[snafu(display("Failed to execute function: {source}"))]
145 Execute {
146 #[snafu(implicit)]
147 location: Location,
148 source: BoxedError,
149 },
150
151 #[snafu(display("Failed to decode logical plan: {source}"))]
152 DecodePlan {
153 #[snafu(implicit)]
154 location: Location,
155 source: BoxedError,
156 },
157
158 #[snafu(display("Failed to do table mutation"))]
159 TableMutation {
160 source: BoxedError,
161 #[snafu(implicit)]
162 location: Location,
163 },
164
165 #[snafu(display("Failed to do procedure task"))]
166 ProcedureService {
167 source: BoxedError,
168 #[snafu(implicit)]
169 location: Location,
170 },
171
172 #[snafu(display("Missing TableMutationHandler, not expected"))]
173 MissingTableMutationHandler {
174 #[snafu(implicit)]
175 location: Location,
176 },
177
178 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
179 MissingProcedureServiceHandler {
180 #[snafu(implicit)]
181 location: Location,
182 },
183
184 #[snafu(display("Missing FlowServiceHandler, not expected"))]
185 MissingFlowServiceHandler {
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Invalid function args: {}", err_msg))]
191 InvalidFuncArgs {
192 err_msg: String,
193 #[snafu(implicit)]
194 location: Location,
195 },
196
197 #[snafu(display("Permission denied: {}", err_msg))]
198 PermissionDenied {
199 err_msg: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Can't found alive flownode"))]
205 FlownodeNotFound {
206 #[snafu(implicit)]
207 location: Location,
208 },
209
210 #[snafu(display("Invalid vector string: {}", vec_str))]
211 InvalidVectorString {
212 vec_str: String,
213 source: DataTypeError,
214 #[snafu(implicit)]
215 location: Location,
216 },
217
218 #[snafu(display("Failed to register UDF: {}", name))]
219 RegisterUdf {
220 name: String,
221 #[snafu(source)]
222 error: DataFusionError,
223 #[snafu(implicit)]
224 location: Location,
225 },
226}
227
228pub type Result<T> = std::result::Result<T, Error>;
229
230impl ErrorExt for Error {
231 fn status_code(&self) -> StatusCode {
232 match self {
233 Error::CreateAccumulator { .. }
234 | Error::DowncastVector { .. }
235 | Error::InvalidInputState { .. }
236 | Error::BadAccumulatorImpl { .. }
237 | Error::ToScalarValue { .. }
238 | Error::GetScalarVector { .. }
239 | Error::ArrowCompute { .. }
240 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
241
242 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
243
244 Error::InvalidInputType { source, .. }
245 | Error::IntoVector { source, .. }
246 | Error::FromScalarValue { source, .. }
247 | Error::ConvertArrowSchema { source, .. }
248 | Error::FromArrowArray { source, .. }
249 | Error::InvalidVectorString { source, .. } => source.status_code(),
250
251 Error::MissingTableMutationHandler { .. }
252 | Error::MissingProcedureServiceHandler { .. }
253 | Error::MissingFlowServiceHandler { .. }
254 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
255
256 Error::UnsupportedInputDataType { .. }
257 | Error::TypeCast { .. }
258 | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
259
260 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
261
262 Error::DecodePlan { source, .. }
263 | Error::Execute { source, .. }
264 | Error::ProcedureService { source, .. }
265 | Error::TableMutation { source, .. } => source.status_code(),
266
267 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
268 }
269 }
270
271 fn as_any(&self) -> &dyn Any {
272 self
273 }
274}
275
276impl From<Error> for DataFusionError {
277 fn from(e: Error) -> DataFusionError {
278 DataFusionError::External(Box::new(e))
279 }
280}
281
282pub fn datafusion_status_code<T: ErrorExt + 'static>(
284 e: &DataFusionError,
285 default_status: Option<StatusCode>,
286) -> StatusCode {
287 match e {
288 DataFusionError::Internal(_) => StatusCode::Internal,
289 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
290 DataFusionError::Plan(_) => StatusCode::PlanQuery,
291 DataFusionError::External(e) => {
292 if let Some(ext) = (*e).downcast_ref::<T>() {
293 ext.status_code()
294 } else {
295 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
296 }
297 }
298 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
299 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
300 }
301}