1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector"))]
48 FromArrowArray {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 },
53
54 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
55 IntoVector {
56 #[snafu(implicit)]
57 location: Location,
58 source: DataTypeError,
59 data_type: ArrowDatatype,
60 },
61
62 #[snafu(display("Failed to downcast vector: {}", err_msg))]
63 DowncastVector { err_msg: String },
64
65 #[snafu(display("Invalid input type: {}", err_msg))]
66 InvalidInputType {
67 #[snafu(implicit)]
68 location: Location,
69 source: DataTypeError,
70 err_msg: String,
71 },
72
73 #[snafu(display(
74 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
75 ))]
76 InvalidInputState {
77 #[snafu(implicit)]
78 location: Location,
79 },
80
81 #[snafu(display("General DataFusion error"))]
82 GeneralDataFusion {
83 #[snafu(source)]
84 error: DataFusionError,
85 #[snafu(implicit)]
86 location: Location,
87 },
88
89 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
90 ConvertDfRecordBatchStream {
91 #[snafu(implicit)]
92 location: Location,
93 source: common_recordbatch::error::Error,
94 },
95
96 #[snafu(display("Failed to cast array to {:?}", typ))]
97 TypeCast {
98 #[snafu(source)]
99 error: ArrowError,
100 typ: arrow::datatypes::DataType,
101 #[snafu(implicit)]
102 location: Location,
103 },
104
105 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
106 ArrowCompute {
107 #[snafu(source)]
108 error: ArrowError,
109 #[snafu(implicit)]
110 location: Location,
111 },
112
113 #[snafu(display("Query engine fail to cast value"))]
114 ToScalarValue {
115 #[snafu(implicit)]
116 location: Location,
117 source: DataTypeError,
118 },
119
120 #[snafu(display("Failed to get scalar vector"))]
121 GetScalarVector {
122 #[snafu(implicit)]
123 location: Location,
124 source: DataTypeError,
125 },
126
127 #[snafu(display("Failed to execute function: {source}"))]
128 Execute {
129 #[snafu(implicit)]
130 location: Location,
131 source: BoxedError,
132 },
133
134 #[snafu(display("Failed to decode logical plan: {source}"))]
135 DecodePlan {
136 #[snafu(implicit)]
137 location: Location,
138 source: BoxedError,
139 },
140
141 #[snafu(display("Failed to do table mutation"))]
142 TableMutation {
143 source: BoxedError,
144 #[snafu(implicit)]
145 location: Location,
146 },
147
148 #[snafu(display("Failed to do procedure task"))]
149 ProcedureService {
150 source: BoxedError,
151 #[snafu(implicit)]
152 location: Location,
153 },
154
155 #[snafu(display("Missing TableMutationHandler, not expected"))]
156 MissingTableMutationHandler {
157 #[snafu(implicit)]
158 location: Location,
159 },
160
161 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
162 MissingProcedureServiceHandler {
163 #[snafu(implicit)]
164 location: Location,
165 },
166
167 #[snafu(display("Missing FlowServiceHandler, not expected"))]
168 MissingFlowServiceHandler {
169 #[snafu(implicit)]
170 location: Location,
171 },
172
173 #[snafu(display("Invalid function args: {}", err_msg))]
174 InvalidFuncArgs {
175 err_msg: String,
176 #[snafu(implicit)]
177 location: Location,
178 },
179
180 #[snafu(display("Permission denied: {}", err_msg))]
181 PermissionDenied {
182 err_msg: String,
183 #[snafu(implicit)]
184 location: Location,
185 },
186
187 #[snafu(display("Can't found alive flownode"))]
188 FlownodeNotFound {
189 #[snafu(implicit)]
190 location: Location,
191 },
192
193 #[snafu(display("Invalid vector string: {}", vec_str))]
194 InvalidVectorString {
195 vec_str: String,
196 source: DataTypeError,
197 #[snafu(implicit)]
198 location: Location,
199 },
200
201 #[snafu(display("Failed to register UDF: {}", name))]
202 RegisterUdf {
203 name: String,
204 #[snafu(source)]
205 error: DataFusionError,
206 #[snafu(implicit)]
207 location: Location,
208 },
209}
210
211pub type Result<T> = std::result::Result<T, Error>;
212
213impl ErrorExt for Error {
214 fn status_code(&self) -> StatusCode {
215 match self {
216 Error::DowncastVector { .. }
217 | Error::InvalidInputState { .. }
218 | Error::ToScalarValue { .. }
219 | Error::GetScalarVector { .. }
220 | Error::ArrowCompute { .. }
221 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
222
223 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
224
225 Error::InvalidInputType { source, .. }
226 | Error::IntoVector { source, .. }
227 | Error::FromScalarValue { source, .. }
228 | Error::FromArrowArray { source, .. }
229 | Error::InvalidVectorString { source, .. } => source.status_code(),
230
231 Error::MissingTableMutationHandler { .. }
232 | Error::MissingProcedureServiceHandler { .. }
233 | Error::MissingFlowServiceHandler { .. }
234 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
235
236 Error::UnsupportedInputDataType { .. }
237 | Error::TypeCast { .. }
238 | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
239
240 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
241
242 Error::DecodePlan { source, .. }
243 | Error::Execute { source, .. }
244 | Error::ProcedureService { source, .. }
245 | Error::TableMutation { source, .. } => source.status_code(),
246
247 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
248 }
249 }
250
251 fn as_any(&self) -> &dyn Any {
252 self
253 }
254}
255
256impl From<Error> for DataFusionError {
257 fn from(e: Error) -> DataFusionError {
258 DataFusionError::External(Box::new(e))
259 }
260}
261
262pub fn datafusion_status_code<T: ErrorExt + 'static>(
264 e: &DataFusionError,
265 default_status: Option<StatusCode>,
266) -> StatusCode {
267 match e {
268 DataFusionError::Internal(_) => StatusCode::Internal,
269 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
270 DataFusionError::Plan(_) => StatusCode::PlanQuery,
271 DataFusionError::External(e) => {
272 if let Some(ext) = (*e).downcast_ref::<T>() {
273 ext.status_code()
274 } else {
275 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
276 }
277 }
278 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
279 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
280 }
281}