1use std::any::Any;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::ScalarValue;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::schema::SchemaRef;
24use snafu::{Location, Snafu};
25
26pub type Result<T, E = Error> = std::result::Result<T, E>;
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Fail to create datafusion record batch"))]
33 NewDfRecordBatch {
34 #[snafu(source)]
35 error: datatypes::arrow::error::ArrowError,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Data types error"))]
41 DataTypes {
42 #[snafu(implicit)]
43 location: Location,
44 source: datatypes::error::Error,
45 },
46
47 #[snafu(display("External error"))]
48 External {
49 #[snafu(implicit)]
50 location: Location,
51 source: BoxedError,
52 },
53
54 #[snafu(display("Failed to create RecordBatches, reason: {}", reason))]
55 CreateRecordBatches {
56 reason: String,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to convert Arrow schema"))]
62 SchemaConversion {
63 source: datatypes::error::Error,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(transparent)]
69 PollStream {
70 #[snafu(source)]
71 error: datafusion::error::DataFusionError,
72 #[snafu(implicit)]
73 location: Location,
74 },
75
76 #[snafu(display("Create physical expr error"))]
77 PhysicalExpr {
78 #[snafu(source)]
79 error: datafusion::error::DataFusionError,
80 #[snafu(implicit)]
81 location: Location,
82 },
83
84 #[snafu(display("Fail to format record batch"))]
85 Format {
86 #[snafu(source)]
87 error: datatypes::arrow::error::ArrowError,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Failed to convert {v:?} to Arrow scalar"))]
93 ToArrowScalar {
94 v: ScalarValue,
95 #[snafu(source)]
96 error: datafusion_common::DataFusionError,
97 #[snafu(implicit)]
98 location: Location,
99 },
100
101 #[snafu(display(
102 "Failed to project Arrow RecordBatch with schema {:?} and projection {:?}",
103 schema,
104 projection,
105 ))]
106 ProjectArrowRecordBatch {
107 #[snafu(source)]
108 error: datatypes::arrow::error::ArrowError,
109 #[snafu(implicit)]
110 location: Location,
111 schema: datatypes::schema::SchemaRef,
112 projection: Vec<usize>,
113 },
114
115 #[snafu(display("Column {} not exists in table {}", column_name, table_name))]
116 ColumnNotExists {
117 column_name: String,
118 table_name: String,
119 #[snafu(implicit)]
120 location: Location,
121 },
122
123 #[snafu(display(
124 "Failed to cast vector of type '{:?}' to type '{:?}'",
125 from_type,
126 to_type,
127 ))]
128 CastVector {
129 from_type: ConcreteDataType,
130 to_type: ConcreteDataType,
131 #[snafu(implicit)]
132 location: Location,
133 source: datatypes::error::Error,
134 },
135
136 #[snafu(display(
137 "Failed to downcast vector of type '{:?}' to type '{:?}'",
138 from_type,
139 to_type
140 ))]
141 DowncastVector {
142 from_type: ConcreteDataType,
143 to_type: ConcreteDataType,
144 #[snafu(implicit)]
145 location: Location,
146 },
147
148 #[snafu(display("Error occurs when performing arrow computation"))]
149 ArrowCompute {
150 #[snafu(source)]
151 error: datatypes::arrow::error::ArrowError,
152 #[snafu(implicit)]
153 location: Location,
154 },
155
156 #[snafu(display("Unsupported operation: {}", reason))]
157 UnsupportedOperation {
158 reason: String,
159 #[snafu(implicit)]
160 location: Location,
161 },
162
163 #[snafu(display("Cannot construct an empty stream"))]
164 EmptyStream {
165 #[snafu(implicit)]
166 location: Location,
167 },
168
169 #[snafu(display("Schema not match, left: {:?}, right: {:?}", left, right))]
170 SchemaNotMatch {
171 left: SchemaRef,
172 right: SchemaRef,
173 #[snafu(implicit)]
174 location: Location,
175 },
176 #[snafu(display("Stream timeout"))]
177 StreamTimeout {
178 #[snafu(implicit)]
179 location: Location,
180 #[snafu(source)]
181 error: tokio::time::error::Elapsed,
182 },
183 #[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
184 RecordBatchSliceIndexOverflow {
185 #[snafu(implicit)]
186 location: Location,
187 size: usize,
188 visit_index: usize,
189 },
190}
191
192impl ErrorExt for Error {
193 fn status_code(&self) -> StatusCode {
194 match self {
195 Error::NewDfRecordBatch { .. }
196 | Error::EmptyStream { .. }
197 | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
198
199 Error::DataTypes { .. }
200 | Error::CreateRecordBatches { .. }
201 | Error::Format { .. }
202 | Error::ToArrowScalar { .. }
203 | Error::ProjectArrowRecordBatch { .. }
204 | Error::PhysicalExpr { .. }
205 | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
206
207 Error::DowncastVector { .. } => StatusCode::Unexpected,
208
209 Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
210
211 Error::ArrowCompute { .. } => StatusCode::IllegalState,
212
213 Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
214
215 Error::External { source, .. } => source.status_code(),
216
217 Error::UnsupportedOperation { .. } => StatusCode::Unsupported,
218
219 Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
220 source.status_code()
221 }
222
223 Error::StreamTimeout { .. } => StatusCode::Cancelled,
224 }
225 }
226
227 fn as_any(&self) -> &dyn Any {
228 self
229 }
230}