1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
48 IntoVector {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 data_type: ArrowDatatype,
53 },
54
55 #[snafu(display("Invalid input type: {}", err_msg))]
56 InvalidInputType {
57 #[snafu(implicit)]
58 location: Location,
59 source: DataTypeError,
60 err_msg: String,
61 },
62
63 #[snafu(display(
64 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
65 ))]
66 InvalidInputState {
67 #[snafu(implicit)]
68 location: Location,
69 },
70
71 #[snafu(transparent)]
72 GeneralDataFusion {
73 #[snafu(source)]
74 error: DataFusionError,
75 #[snafu(implicit)]
76 location: Location,
77 },
78
79 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
80 ConvertDfRecordBatchStream {
81 #[snafu(implicit)]
82 location: Location,
83 source: common_recordbatch::error::Error,
84 },
85
86 #[snafu(display("Failed to cast array to {:?}", typ))]
87 TypeCast {
88 #[snafu(source)]
89 error: ArrowError,
90 typ: arrow::datatypes::DataType,
91 #[snafu(implicit)]
92 location: Location,
93 },
94
95 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
96 ArrowCompute {
97 #[snafu(source)]
98 error: ArrowError,
99 #[snafu(implicit)]
100 location: Location,
101 },
102
103 #[snafu(display("Query engine fail to cast value"))]
104 ToScalarValue {
105 #[snafu(implicit)]
106 location: Location,
107 source: DataTypeError,
108 },
109
110 #[snafu(display("Failed to get scalar vector"))]
111 GetScalarVector {
112 #[snafu(implicit)]
113 location: Location,
114 source: DataTypeError,
115 },
116
117 #[snafu(display("Failed to execute function: {source}"))]
118 Execute {
119 #[snafu(implicit)]
120 location: Location,
121 source: BoxedError,
122 },
123
124 #[snafu(display("Failed to decode logical plan: {source}"))]
125 DecodePlan {
126 #[snafu(implicit)]
127 location: Location,
128 source: BoxedError,
129 },
130
131 #[snafu(display("Failed to do table mutation"))]
132 TableMutation {
133 source: BoxedError,
134 #[snafu(implicit)]
135 location: Location,
136 },
137
138 #[snafu(display("Failed to do procedure task"))]
139 ProcedureService {
140 source: BoxedError,
141 #[snafu(implicit)]
142 location: Location,
143 },
144
145 #[snafu(display("Missing TableMutationHandler, not expected"))]
146 MissingTableMutationHandler {
147 #[snafu(implicit)]
148 location: Location,
149 },
150
151 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
152 MissingProcedureServiceHandler {
153 #[snafu(implicit)]
154 location: Location,
155 },
156
157 #[snafu(display("Missing FlowServiceHandler, not expected"))]
158 MissingFlowServiceHandler {
159 #[snafu(implicit)]
160 location: Location,
161 },
162
163 #[snafu(display("Invalid function args: {}", err_msg))]
164 InvalidFuncArgs {
165 err_msg: String,
166 #[snafu(implicit)]
167 location: Location,
168 },
169
170 #[snafu(display("Permission denied: {}", err_msg))]
171 PermissionDenied {
172 err_msg: String,
173 #[snafu(implicit)]
174 location: Location,
175 },
176
177 #[snafu(display("Can't found alive flownode"))]
178 FlownodeNotFound {
179 #[snafu(implicit)]
180 location: Location,
181 },
182
183 #[snafu(display("Invalid vector string: {}", vec_str))]
184 InvalidVectorString {
185 vec_str: String,
186 source: DataTypeError,
187 #[snafu(implicit)]
188 location: Location,
189 },
190
191 #[snafu(display("Failed to register UDF: {}", name))]
192 RegisterUdf {
193 name: String,
194 #[snafu(source)]
195 error: DataFusionError,
196 #[snafu(implicit)]
197 location: Location,
198 },
199
200 #[snafu(display("Invalid character in prefix config: {}", prefix))]
201 InvalidColumnPrefix { prefix: String },
202
203 #[snafu(display(
204 "DynFilterPayload::Datafusion is {} bytes, which exceeds the configured limit of {} bytes",
205 payload_size_bytes,
206 max_payload_bytes
207 ))]
208 DynFilterPayloadTooLarge {
209 payload_size_bytes: usize,
210 max_payload_bytes: usize,
211 #[snafu(implicit)]
212 location: Location,
213 },
214}
215
216pub type Result<T> = std::result::Result<T, Error>;
217
218impl ErrorExt for Error {
219 fn status_code(&self) -> StatusCode {
220 match self {
221 Error::InvalidInputState { .. }
222 | Error::ToScalarValue { .. }
223 | Error::GetScalarVector { .. }
224 | Error::ArrowCompute { .. }
225 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
226
227 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
228
229 Error::InvalidInputType { source, .. }
230 | Error::IntoVector { source, .. }
231 | Error::FromScalarValue { source, .. }
232 | Error::InvalidVectorString { source, .. } => source.status_code(),
233
234 Error::MissingTableMutationHandler { .. }
235 | Error::MissingProcedureServiceHandler { .. }
236 | Error::MissingFlowServiceHandler { .. }
237 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
238
239 Error::UnsupportedInputDataType { .. }
240 | Error::TypeCast { .. }
241 | Error::InvalidFuncArgs { .. }
242 | Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
243
244 Error::DynFilterPayloadTooLarge { .. } => StatusCode::PlanQuery,
245
246 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
247
248 Error::DecodePlan { source, .. }
249 | Error::Execute { source, .. }
250 | Error::ProcedureService { source, .. }
251 | Error::TableMutation { source, .. } => source.status_code(),
252
253 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
254 }
255 }
256
257 fn as_any(&self) -> &dyn Any {
258 self
259 }
260}
261
262impl From<Error> for DataFusionError {
263 fn from(e: Error) -> DataFusionError {
264 DataFusionError::External(Box::new(e))
265 }
266}
267
268pub fn datafusion_status_code<T: ErrorExt + 'static>(
270 e: &DataFusionError,
271 default_status: Option<StatusCode>,
272) -> StatusCode {
273 match e {
274 DataFusionError::Internal(_) => StatusCode::Internal,
275 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
276 DataFusionError::Plan(_) => StatusCode::PlanQuery,
277 DataFusionError::External(e) => {
278 if let Some(ext) = (*e).downcast_ref::<T>() {
279 ext.status_code()
280 } else {
281 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
282 }
283 }
284 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
285 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
286 }
287}