1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
48 IntoVector {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 data_type: ArrowDatatype,
53 },
54
55 #[snafu(display("Failed to downcast vector: {}", err_msg))]
56 DowncastVector { err_msg: String },
57
58 #[snafu(display("Invalid input type: {}", err_msg))]
59 InvalidInputType {
60 #[snafu(implicit)]
61 location: Location,
62 source: DataTypeError,
63 err_msg: String,
64 },
65
66 #[snafu(display(
67 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
68 ))]
69 InvalidInputState {
70 #[snafu(implicit)]
71 location: Location,
72 },
73
74 #[snafu(transparent)]
75 GeneralDataFusion {
76 #[snafu(source)]
77 error: DataFusionError,
78 #[snafu(implicit)]
79 location: Location,
80 },
81
82 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
83 ConvertDfRecordBatchStream {
84 #[snafu(implicit)]
85 location: Location,
86 source: common_recordbatch::error::Error,
87 },
88
89 #[snafu(display("Failed to cast array to {:?}", typ))]
90 TypeCast {
91 #[snafu(source)]
92 error: ArrowError,
93 typ: arrow::datatypes::DataType,
94 #[snafu(implicit)]
95 location: Location,
96 },
97
98 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
99 ArrowCompute {
100 #[snafu(source)]
101 error: ArrowError,
102 #[snafu(implicit)]
103 location: Location,
104 },
105
106 #[snafu(display("Query engine fail to cast value"))]
107 ToScalarValue {
108 #[snafu(implicit)]
109 location: Location,
110 source: DataTypeError,
111 },
112
113 #[snafu(display("Failed to get scalar vector"))]
114 GetScalarVector {
115 #[snafu(implicit)]
116 location: Location,
117 source: DataTypeError,
118 },
119
120 #[snafu(display("Failed to execute function: {source}"))]
121 Execute {
122 #[snafu(implicit)]
123 location: Location,
124 source: BoxedError,
125 },
126
127 #[snafu(display("Failed to decode logical plan: {source}"))]
128 DecodePlan {
129 #[snafu(implicit)]
130 location: Location,
131 source: BoxedError,
132 },
133
134 #[snafu(display("Failed to do table mutation"))]
135 TableMutation {
136 source: BoxedError,
137 #[snafu(implicit)]
138 location: Location,
139 },
140
141 #[snafu(display("Failed to do procedure task"))]
142 ProcedureService {
143 source: BoxedError,
144 #[snafu(implicit)]
145 location: Location,
146 },
147
148 #[snafu(display("Missing TableMutationHandler, not expected"))]
149 MissingTableMutationHandler {
150 #[snafu(implicit)]
151 location: Location,
152 },
153
154 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
155 MissingProcedureServiceHandler {
156 #[snafu(implicit)]
157 location: Location,
158 },
159
160 #[snafu(display("Missing FlowServiceHandler, not expected"))]
161 MissingFlowServiceHandler {
162 #[snafu(implicit)]
163 location: Location,
164 },
165
166 #[snafu(display("Invalid function args: {}", err_msg))]
167 InvalidFuncArgs {
168 err_msg: String,
169 #[snafu(implicit)]
170 location: Location,
171 },
172
173 #[snafu(display("Permission denied: {}", err_msg))]
174 PermissionDenied {
175 err_msg: String,
176 #[snafu(implicit)]
177 location: Location,
178 },
179
180 #[snafu(display("Can't found alive flownode"))]
181 FlownodeNotFound {
182 #[snafu(implicit)]
183 location: Location,
184 },
185
186 #[snafu(display("Invalid vector string: {}", vec_str))]
187 InvalidVectorString {
188 vec_str: String,
189 source: DataTypeError,
190 #[snafu(implicit)]
191 location: Location,
192 },
193
194 #[snafu(display("Failed to register UDF: {}", name))]
195 RegisterUdf {
196 name: String,
197 #[snafu(source)]
198 error: DataFusionError,
199 #[snafu(implicit)]
200 location: Location,
201 },
202}
203
204pub type Result<T> = std::result::Result<T, Error>;
205
206impl ErrorExt for Error {
207 fn status_code(&self) -> StatusCode {
208 match self {
209 Error::DowncastVector { .. }
210 | Error::InvalidInputState { .. }
211 | Error::ToScalarValue { .. }
212 | Error::GetScalarVector { .. }
213 | Error::ArrowCompute { .. }
214 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
215
216 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
217
218 Error::InvalidInputType { source, .. }
219 | Error::IntoVector { source, .. }
220 | Error::FromScalarValue { source, .. }
221 | Error::InvalidVectorString { source, .. } => source.status_code(),
222
223 Error::MissingTableMutationHandler { .. }
224 | Error::MissingProcedureServiceHandler { .. }
225 | Error::MissingFlowServiceHandler { .. }
226 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
227
228 Error::UnsupportedInputDataType { .. }
229 | Error::TypeCast { .. }
230 | Error::InvalidFuncArgs { .. } => StatusCode::InvalidArguments,
231
232 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
233
234 Error::DecodePlan { source, .. }
235 | Error::Execute { source, .. }
236 | Error::ProcedureService { source, .. }
237 | Error::TableMutation { source, .. } => source.status_code(),
238
239 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
240 }
241 }
242
243 fn as_any(&self) -> &dyn Any {
244 self
245 }
246}
247
248impl From<Error> for DataFusionError {
249 fn from(e: Error) -> DataFusionError {
250 DataFusionError::External(Box::new(e))
251 }
252}
253
254pub fn datafusion_status_code<T: ErrorExt + 'static>(
256 e: &DataFusionError,
257 default_status: Option<StatusCode>,
258) -> StatusCode {
259 match e {
260 DataFusionError::Internal(_) => StatusCode::Internal,
261 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
262 DataFusionError::Plan(_) => StatusCode::PlanQuery,
263 DataFusionError::External(e) => {
264 if let Some(ext) = (*e).downcast_ref::<T>() {
265 ext.status_code()
266 } else {
267 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
268 }
269 }
270 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
271 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
272 }
273}