1use std::any::Any;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::ScalarValue;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::schema::SchemaRef;
24use snafu::{Location, Snafu};
25
26pub type Result<T, E = Error> = std::result::Result<T, E>;
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32 #[snafu(display("Fail to create datafusion record batch"))]
33 NewDfRecordBatch {
34 #[snafu(source)]
35 error: datatypes::arrow::error::ArrowError,
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display("Data types error"))]
41 DataTypes {
42 #[snafu(implicit)]
43 location: Location,
44 source: datatypes::error::Error,
45 },
46
47 #[snafu(display("External error"))]
48 External {
49 #[snafu(implicit)]
50 location: Location,
51 source: BoxedError,
52 },
53
54 #[snafu(display("Failed to create RecordBatches, reason: {}", reason))]
55 CreateRecordBatches {
56 reason: String,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to convert Arrow schema"))]
62 SchemaConversion {
63 source: datatypes::error::Error,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(transparent)]
69 PollStream {
70 #[snafu(source)]
71 error: datafusion::error::DataFusionError,
72 #[snafu(implicit)]
73 location: Location,
74 },
75
76 #[snafu(display("Create physical expr error"))]
77 PhysicalExpr {
78 #[snafu(source)]
79 error: datafusion::error::DataFusionError,
80 #[snafu(implicit)]
81 location: Location,
82 },
83
84 #[snafu(display("Fail to format record batch"))]
85 Format {
86 #[snafu(source)]
87 error: datatypes::arrow::error::ArrowError,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Failed to convert {v:?} to Arrow scalar"))]
93 ToArrowScalar {
94 v: ScalarValue,
95 #[snafu(source)]
96 error: datafusion_common::DataFusionError,
97 #[snafu(implicit)]
98 location: Location,
99 },
100
101 #[snafu(display(
102 "Failed to project Arrow RecordBatch with schema {:?} and projection {:?}",
103 schema,
104 projection,
105 ))]
106 ProjectArrowRecordBatch {
107 #[snafu(source)]
108 error: datatypes::arrow::error::ArrowError,
109 #[snafu(implicit)]
110 location: Location,
111 schema: datatypes::schema::SchemaRef,
112 projection: Vec<usize>,
113 },
114
115 #[snafu(display("Column {} not exists in table {}", column_name, table_name))]
116 ColumnNotExists {
117 column_name: String,
118 table_name: String,
119 #[snafu(implicit)]
120 location: Location,
121 },
122
123 #[snafu(display(
124 "Failed to cast vector of type '{:?}' to type '{:?}'",
125 from_type,
126 to_type,
127 ))]
128 CastVector {
129 from_type: ConcreteDataType,
130 to_type: ConcreteDataType,
131 #[snafu(implicit)]
132 location: Location,
133 source: datatypes::error::Error,
134 },
135
136 #[snafu(display("Error occurs when performing arrow computation"))]
137 ArrowCompute {
138 #[snafu(source)]
139 error: datatypes::arrow::error::ArrowError,
140 #[snafu(implicit)]
141 location: Location,
142 },
143
144 #[snafu(display("Unsupported operation: {}", reason))]
145 UnsupportedOperation {
146 reason: String,
147 #[snafu(implicit)]
148 location: Location,
149 },
150
151 #[snafu(display("Cannot construct an empty stream"))]
152 EmptyStream {
153 #[snafu(implicit)]
154 location: Location,
155 },
156
157 #[snafu(display("Schema not match, left: {:?}, right: {:?}", left, right))]
158 SchemaNotMatch {
159 left: SchemaRef,
160 right: SchemaRef,
161 #[snafu(implicit)]
162 location: Location,
163 },
164 #[snafu(display("Stream timeout"))]
165 StreamTimeout {
166 #[snafu(implicit)]
167 location: Location,
168 #[snafu(source)]
169 error: tokio::time::error::Elapsed,
170 },
171 #[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
172 RecordBatchSliceIndexOverflow {
173 #[snafu(implicit)]
174 location: Location,
175 size: usize,
176 visit_index: usize,
177 },
178}
179
180impl ErrorExt for Error {
181 fn status_code(&self) -> StatusCode {
182 match self {
183 Error::NewDfRecordBatch { .. }
184 | Error::EmptyStream { .. }
185 | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
186
187 Error::DataTypes { .. }
188 | Error::CreateRecordBatches { .. }
189 | Error::Format { .. }
190 | Error::ToArrowScalar { .. }
191 | Error::ProjectArrowRecordBatch { .. }
192 | Error::PhysicalExpr { .. }
193 | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
194
195 Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
196
197 Error::ArrowCompute { .. } => StatusCode::IllegalState,
198
199 Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
200
201 Error::External { source, .. } => source.status_code(),
202
203 Error::UnsupportedOperation { .. } => StatusCode::Unsupported,
204
205 Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
206 source.status_code()
207 }
208
209 Error::StreamTimeout { .. } => StatusCode::Cancelled,
210 }
211 }
212
213 fn as_any(&self) -> &dyn Any {
214 self
215 }
216}