1use std::any::Any;
16
17use arrow::error::ArrowError;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::DataFusionError;
22use datatypes::arrow;
23use datatypes::arrow::datatypes::DataType as ArrowDatatype;
24use datatypes::error::Error as DataTypeError;
25use datatypes::prelude::ConcreteDataType;
26use snafu::{Location, Snafu};
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Unsupported input datatypes {:?} in function {}", datatypes, function))]
33 UnsupportedInputDataType {
34 function: String,
35 datatypes: Vec<ConcreteDataType>,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Failed to cast scalar value into vector"))]
41 FromScalarValue {
42 #[snafu(implicit)]
43 location: Location,
44 source: DataTypeError,
45 },
46
47 #[snafu(display("Failed to cast arrow array into vector: {:?}", data_type))]
48 IntoVector {
49 #[snafu(implicit)]
50 location: Location,
51 source: DataTypeError,
52 data_type: ArrowDatatype,
53 },
54
55 #[snafu(display("Invalid input type: {}", err_msg))]
56 InvalidInputType {
57 #[snafu(implicit)]
58 location: Location,
59 source: DataTypeError,
60 err_msg: String,
61 },
62
63 #[snafu(display(
64 "Illegal input_types status, check if DataFusion has changed its UDAF execution logic"
65 ))]
66 InvalidInputState {
67 #[snafu(implicit)]
68 location: Location,
69 },
70
71 #[snafu(transparent)]
72 GeneralDataFusion {
73 #[snafu(source)]
74 error: DataFusionError,
75 #[snafu(implicit)]
76 location: Location,
77 },
78
79 #[snafu(display("Failed to convert DataFusion's recordbatch stream"))]
80 ConvertDfRecordBatchStream {
81 #[snafu(implicit)]
82 location: Location,
83 source: common_recordbatch::error::Error,
84 },
85
86 #[snafu(display("Failed to cast array to {:?}", typ))]
87 TypeCast {
88 #[snafu(source)]
89 error: ArrowError,
90 typ: arrow::datatypes::DataType,
91 #[snafu(implicit)]
92 location: Location,
93 },
94
95 #[snafu(display("Failed to perform compute operation on arrow arrays"))]
96 ArrowCompute {
97 #[snafu(source)]
98 error: ArrowError,
99 #[snafu(implicit)]
100 location: Location,
101 },
102
103 #[snafu(display("Query engine fail to cast value"))]
104 ToScalarValue {
105 #[snafu(implicit)]
106 location: Location,
107 source: DataTypeError,
108 },
109
110 #[snafu(display("Failed to get scalar vector"))]
111 GetScalarVector {
112 #[snafu(implicit)]
113 location: Location,
114 source: DataTypeError,
115 },
116
117 #[snafu(display("Failed to execute function: {source}"))]
118 Execute {
119 #[snafu(implicit)]
120 location: Location,
121 source: BoxedError,
122 },
123
124 #[snafu(display("Failed to decode logical plan: {source}"))]
125 DecodePlan {
126 #[snafu(implicit)]
127 location: Location,
128 source: BoxedError,
129 },
130
131 #[snafu(display("Failed to do table mutation"))]
132 TableMutation {
133 source: BoxedError,
134 #[snafu(implicit)]
135 location: Location,
136 },
137
138 #[snafu(display("Failed to do procedure task"))]
139 ProcedureService {
140 source: BoxedError,
141 #[snafu(implicit)]
142 location: Location,
143 },
144
145 #[snafu(display("Missing TableMutationHandler, not expected"))]
146 MissingTableMutationHandler {
147 #[snafu(implicit)]
148 location: Location,
149 },
150
151 #[snafu(display("Missing ProcedureServiceHandler, not expected"))]
152 MissingProcedureServiceHandler {
153 #[snafu(implicit)]
154 location: Location,
155 },
156
157 #[snafu(display("Missing FlowServiceHandler, not expected"))]
158 MissingFlowServiceHandler {
159 #[snafu(implicit)]
160 location: Location,
161 },
162
163 #[snafu(display("Invalid function args: {}", err_msg))]
164 InvalidFuncArgs {
165 err_msg: String,
166 #[snafu(implicit)]
167 location: Location,
168 },
169
170 #[snafu(display("Permission denied: {}", err_msg))]
171 PermissionDenied {
172 err_msg: String,
173 #[snafu(implicit)]
174 location: Location,
175 },
176
177 #[snafu(display("Can't found alive flownode"))]
178 FlownodeNotFound {
179 #[snafu(implicit)]
180 location: Location,
181 },
182
183 #[snafu(display("Invalid vector string: {}", vec_str))]
184 InvalidVectorString {
185 vec_str: String,
186 source: DataTypeError,
187 #[snafu(implicit)]
188 location: Location,
189 },
190
191 #[snafu(display("Failed to register UDF: {}", name))]
192 RegisterUdf {
193 name: String,
194 #[snafu(source)]
195 error: DataFusionError,
196 #[snafu(implicit)]
197 location: Location,
198 },
199
200 #[snafu(display("Invalid character in prefix config: {}", prefix))]
201 InvalidColumnPrefix { prefix: String },
202}
203
204pub type Result<T> = std::result::Result<T, Error>;
205
206impl ErrorExt for Error {
207 fn status_code(&self) -> StatusCode {
208 match self {
209 Error::InvalidInputState { .. }
210 | Error::ToScalarValue { .. }
211 | Error::GetScalarVector { .. }
212 | Error::ArrowCompute { .. }
213 | Error::FlownodeNotFound { .. } => StatusCode::EngineExecuteQuery,
214
215 Error::GeneralDataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
216
217 Error::InvalidInputType { source, .. }
218 | Error::IntoVector { source, .. }
219 | Error::FromScalarValue { source, .. }
220 | Error::InvalidVectorString { source, .. } => source.status_code(),
221
222 Error::MissingTableMutationHandler { .. }
223 | Error::MissingProcedureServiceHandler { .. }
224 | Error::MissingFlowServiceHandler { .. }
225 | Error::RegisterUdf { .. } => StatusCode::Unexpected,
226
227 Error::UnsupportedInputDataType { .. }
228 | Error::TypeCast { .. }
229 | Error::InvalidFuncArgs { .. }
230 | Error::InvalidColumnPrefix { .. } => StatusCode::InvalidArguments,
231
232 Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
233
234 Error::DecodePlan { source, .. }
235 | Error::Execute { source, .. }
236 | Error::ProcedureService { source, .. }
237 | Error::TableMutation { source, .. } => source.status_code(),
238
239 Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
240 }
241 }
242
243 fn as_any(&self) -> &dyn Any {
244 self
245 }
246}
247
248impl From<Error> for DataFusionError {
249 fn from(e: Error) -> DataFusionError {
250 DataFusionError::External(Box::new(e))
251 }
252}
253
254pub fn datafusion_status_code<T: ErrorExt + 'static>(
256 e: &DataFusionError,
257 default_status: Option<StatusCode>,
258) -> StatusCode {
259 match e {
260 DataFusionError::Internal(_) => StatusCode::Internal,
261 DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
262 DataFusionError::Plan(_) => StatusCode::PlanQuery,
263 DataFusionError::External(e) => {
264 if let Some(ext) = (*e).downcast_ref::<T>() {
265 ext.status_code()
266 } else {
267 default_status.unwrap_or(StatusCode::EngineExecuteQuery)
268 }
269 }
270 DataFusionError::Diagnostic(_, e) => datafusion_status_code::<T>(e, default_status),
271 _ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
272 }
273}