1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector"))]
48 FromArrowArray {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 },
53
54 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55 IntoVector {
56 #[snafu(implicit)]
57 location: Location,
58 source: DataTypeError,
59 data_type: ArrowDatatype,
60 },
61
62 #[snafu(display("Failed to create accumulator: {}", err_msg))]
63 CreateAccumulator { err_msg: String },
64
65 #[snafu(display("Failed to downcast vector: {}", err_msg))]
66 DowncastVector { err_msg: String },
67
68 #[snafu(display("Bad accumulator implementation: {}", err_msg))]
69 BadAccumulatorImpl {
70 err_msg: String,
71 #[snafu(implicit)]
72 location: Location,
73 },
74
75 #[snafu(display("Invalid input type: {}", err_msg))]
76 InvalidInputType {
77 #[snafu(implicit)]
78 location: Location,
79 source: DataTypeError,
80 err_msg: String,
81 },
82
83 #[snafu(display(
84 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
85 ))]
86 InvalidInputState {
87 #[snafu(implicit)]
88 location: Location,
89 },
90
91 #[snafu(display("General DataFusion error"))]
92 GeneralDataFusion {
93 #[snafu(source)]
94 error: DataFusionError,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
100 ConvertDfRecordBatchStream {
101 #[snafu(implicit)]
102 location: Location,
103 source: common_recordbatch::error::Error,
104 },
105
106 #[snafu(display("Failed to cast array to {:?}", typ))]
107 TypeCast {
108 #[snafu(source)]
109 error: ArrowError,
110 typ: arrow::datatypes::DataType,
111 #[snafu(implicit)]
112 location: Location,
113 },
114
115 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
116 ArrowCompute {
117 #[snafu(source)]
118 error: ArrowError,
119 #[snafu(implicit)]
120 location: Location,
121 },
122
123 #[snafu(display("Query engine fail to cast value"))]
124 ToScalarValue {
125 #[snafu(implicit)]
126 location: Location,
127 source: DataTypeError,
128 },
129
130 #[snafu(display("Failed to get scalar vector"))]
131 GetScalarVector {
132 #[snafu(implicit)]
133 location: Location,
134 source: DataTypeError,
135 },
136
137 #[snafu(display("Failed to execute function: {source}"))]
138 Execute {
139 #[snafu(implicit)]
140 location: Location,
141 source: BoxedError,
142 },
143
144 #[snafu(display("Failed to decode logical plan: {source}"))]
145 DecodePlan {
146 #[snafu(implicit)]
147 location: Location,
148 source: BoxedError,
149 },
150
151 #[snafu(display("Failed to do table mutation"))]
152 TableMutation {
153 source: BoxedError,
154 #[snafu(implicit)]
155 location: Location,
156 },
157
158 #[snafu(display("Failed to do procedure task"))]
159 ProcedureService {
160 source: BoxedError,
161 #[snafu(implicit)]
162 location: Location,
163 },
164
165 #[snafu(display("Missing TableMutationHandler, not expected"))]
166 MissingTableMutationHandler {
167 #[snafu(implicit)]
168 location: Location,
169 },
170
171 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
172 MissingProcedureServiceHandler {
173 #[snafu(implicit)]
174 location: Location,
175 },
176
177 #[snafu(display("Missing FlowServiceHandler, not expected"))]
178 MissingFlowServiceHandler {
179 #[snafu(implicit)]
180 location: Location,
181 },
182
183 #[snafu(display("Invalid function args: {}", err_msg))]
184 InvalidFuncArgs {
185 err_msg: String,
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Permission denied: {}", err_msg))]
191 PermissionDenied {
192 err_msg: String,
193 #[snafu(implicit)]
194 location: Location,
195 },
196
197 #[snafu(display("Can't found alive flownode"))]
198 FlownodeNotFound {
199 #[snafu(implicit)]
200 location: Location,
201 },
202
203 #[snafu(display("Invalid vector string: {}", vec_str))]
204 InvalidVectorString {
205 vec_str: String,
206 source: DataTypeError,
207 #[snafu(implicit)]
208 location: Location,
209 },
210
211 #[snafu(display("Failed to register UDF: {}", name))]
212 RegisterUdf {
213 name: String,
214 #[snafu(source)]
215 error: DataFusionError,
216 #[snafu(implicit)]
217 location: Location,
218 },
219}
220
221pub type Result<T> = std::result::Result<T, Error>;
222
223impl ErrorExt for Error {
224 fn status_code(&self) -> StatusCode {
225 match self {
226 Error::CreateAccumulator { .. }
227 | Error::DowncastVector { .. }
228 | Error::InvalidInputState { .. }
229 | Error::BadAccumulatorImpl { .. }
230 | Error::ToScalarValue { .. }
231 | Error::GetScalarVector { .. }
232 | Error::ArrowCompute { .. }
233 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
234
235 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
236
237 Error::InvalidInputType { source, .. }
238 | Error::IntoVector { source, .. }
239 | Error::FromScalarValue { source, .. }
240 | Error::FromArrowArray { source, .. }
241 | Error::InvalidVectorString { source, .. } => source.status_code(),
242
243 Error::MissingTableMutationHandler { .. }
244 | Error::MissingProcedureServiceHandler { .. }
245 | Error::MissingFlowServiceHandler { .. }
246 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
247
248 Error::UnsupportedInputDataType { .. }
249 | Error::TypeCast { .. }
250 | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
251
252 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
253
254 Error::DecodePlan { source, .. }
255 | Error::Execute { source, .. }
256 | Error::ProcedureService { source, .. }
257 | Error::TableMutation { source, .. } => source.status_code(),
258
259 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
260 }
261 }
262
263 fn as_any(&self) -> &dyn Any {
264 self
265 }
266}
267
268impl From<Error> for DataFusionError {
269 fn from(e: Error) -> DataFusionError {
270 DataFusionError::External(Box::new(e))
271 }
272}
273
274pub fn datafusion_status_code<T: ErrorExt + 'static>(
276 e: &DataFusionError,
277 default_status: Option<StatusCode>,
278) -> StatusCode {
279 match e {
280 DataFusionError::Internal(_) => StatusCode::Internal,
281 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
282 DataFusionError::Plan(_) => StatusCode::PlanQuery,
283 DataFusionError::External(e) => {
284 if let Some(ext) = (*e).downcast_ref::<T>() {
285 ext.status_code()
286 } else {
287 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
288 }
289 }
290 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
291 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
292 }
293}