1use std::any::Any;
18
19use arrow_schema::ArrowError;
20use common_error::ext::BoxedError;
21use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
22use common_macro::stack_trace_debug;
23use common_telemetry::common_error::ext::ErrorExt;
24use common_telemetry::common_error::status_code::StatusCode;
25use snafu::{Location, ResultExt, Snafu};
26use tonic::metadata::MetadataMap;
27
28use crate::expr::EvalError;
29use crate::FlowId;
30
31#[derive(Snafu)]
33#[snafu(visibility(pub))]
34#[stack_trace_debug]
35pub enum Error {
36 #[snafu(display(
37 "Failed to insert into flow: region_id={}, flow_ids={:?}",
38 region_id,
39 flow_ids
40 ))]
41 InsertIntoFlow {
42 region_id: u64,
43 flow_ids: Vec<u64>,
44 source: BoxedError,
45 #[snafu(implicit)]
46 location: Location,
47 },
48
49 #[snafu(display("Flow engine is still recovering"))]
50 FlowNotRecovered {
51 #[snafu(implicit)]
52 location: Location,
53 },
54
55 #[snafu(display("Error encountered while creating flow: {sql}"))]
56 CreateFlow {
57 sql: String,
58 source: BoxedError,
59 #[snafu(implicit)]
60 location: Location,
61 },
62
63 #[snafu(display("Time error"))]
64 Time {
65 source: common_time::error::Error,
66 #[snafu(implicit)]
67 location: Location,
68 },
69
70 #[snafu(display(
71 "No available frontend found after timeout: {timeout:?}, context: {context}"
72 ))]
73 NoAvailableFrontend {
74 timeout: std::time::Duration,
75 context: String,
76 #[snafu(implicit)]
77 location: Location,
78 },
79
80 #[snafu(display("External error"))]
81 External {
82 source: BoxedError,
83 #[snafu(implicit)]
84 location: Location,
85 },
86
87 #[snafu(display("Internal error"))]
88 Internal {
89 reason: String,
90 #[snafu(implicit)]
91 location: Location,
92 },
93
94 #[snafu(display("Failed to eval stream"))]
96 Eval {
97 source: EvalError,
98 #[snafu(implicit)]
99 location: Location,
100 },
101
102 #[snafu(display("Table not found: {name}"))]
103 TableNotFound {
104 name: String,
105 #[snafu(implicit)]
106 location: Location,
107 },
108
109 #[snafu(display("Table not found: {msg}, meta error: {source}"))]
110 TableNotFoundMeta {
111 source: common_meta::error::Error,
112 msg: String,
113 #[snafu(implicit)]
114 location: Location,
115 },
116
117 #[snafu(display("Flow not found, id={id}"))]
118 FlowNotFound {
119 id: FlowId,
120 #[snafu(implicit)]
121 location: Location,
122 },
123
124 #[snafu(display("Failed to list flows in flownode={id:?}"))]
125 ListFlows {
126 id: Option<common_meta::FlownodeId>,
127 source: common_meta::error::Error,
128 #[snafu(implicit)]
129 location: Location,
130 },
131
132 #[snafu(display("Flow already exist, id={id}"))]
133 FlowAlreadyExist {
134 id: FlowId,
135 #[snafu(implicit)]
136 location: Location,
137 },
138
139 #[snafu(display("Failed to join task"))]
140 JoinTask {
141 #[snafu(source)]
142 error: tokio::task::JoinError,
143 #[snafu(implicit)]
144 location: Location,
145 },
146
147 #[snafu(display("Invalid query: {reason}"))]
148 InvalidQuery {
149 reason: String,
150 #[snafu(implicit)]
151 location: Location,
152 },
153
154 #[snafu(display("Not implement in flow: {reason}"))]
155 NotImplemented {
156 reason: String,
157 #[snafu(implicit)]
158 location: Location,
159 },
160
161 #[snafu(display("Invalid auth config"))]
162 IllegalAuthConfig { source: auth::error::Error },
163
164 #[snafu(display("Flow plan error: {reason}"))]
165 Plan {
166 reason: String,
167 #[snafu(implicit)]
168 location: Location,
169 },
170
171 #[snafu(display("Unsupported: {reason}"))]
172 Unsupported {
173 reason: String,
174 #[snafu(implicit)]
175 location: Location,
176 },
177
178 #[snafu(display("Unsupported temporal filter: {reason}"))]
179 UnsupportedTemporalFilter {
180 reason: String,
181 #[snafu(implicit)]
182 location: Location,
183 },
184
185 #[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
186 Datatypes {
187 source: datatypes::Error,
188 extra: String,
189 #[snafu(implicit)]
190 location: Location,
191 },
192
193 #[snafu(display("Arrow error: {raw:?} in context: {context}"))]
194 Arrow {
195 #[snafu(source)]
196 raw: ArrowError,
197 context: String,
198 #[snafu(implicit)]
199 location: Location,
200 },
201
202 #[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
203 Datafusion {
204 #[snafu(source)]
205 raw: datafusion_common::DataFusionError,
206 context: String,
207 #[snafu(implicit)]
208 location: Location,
209 },
210
211 #[snafu(display("Unexpected: {reason}"))]
212 Unexpected {
213 reason: String,
214 #[snafu(implicit)]
215 location: Location,
216 },
217
218 #[snafu(display("Illegal check task state: {reason}"))]
219 IllegalCheckTaskState {
220 reason: String,
221 #[snafu(implicit)]
222 location: Location,
223 },
224
225 #[snafu(display(
226 "Failed to sync with check task for flow {} with allow_drop={}",
227 flow_id,
228 allow_drop
229 ))]
230 SyncCheckTask {
231 flow_id: FlowId,
232 allow_drop: bool,
233 #[snafu(implicit)]
234 location: Location,
235 },
236
237 #[snafu(display("Failed to start server"))]
238 StartServer {
239 #[snafu(implicit)]
240 location: Location,
241 source: servers::error::Error,
242 },
243
244 #[snafu(display("Failed to shutdown server"))]
245 ShutdownServer {
246 #[snafu(implicit)]
247 location: Location,
248 source: servers::error::Error,
249 },
250
251 #[snafu(display("Failed to initialize meta client"))]
252 MetaClientInit {
253 #[snafu(implicit)]
254 location: Location,
255 source: meta_client::error::Error,
256 },
257
258 #[snafu(display("Failed to parse address {}", addr))]
259 ParseAddr {
260 addr: String,
261 #[snafu(source)]
262 error: std::net::AddrParseError,
263 },
264
265 #[snafu(display("Failed to get cache from cache registry: {}", name))]
266 CacheRequired {
267 #[snafu(implicit)]
268 location: Location,
269 name: String,
270 },
271
272 #[snafu(display("Invalid request: {context}"))]
273 InvalidRequest {
274 context: String,
275 source: client::Error,
276 #[snafu(implicit)]
277 location: Location,
278 },
279
280 #[snafu(display("Failed to encode logical plan in substrait"))]
281 SubstraitEncodeLogicalPlan {
282 #[snafu(implicit)]
283 location: Location,
284 source: substrait::error::Error,
285 },
286
287 #[snafu(display("Failed to convert column schema to proto column def"))]
288 ConvertColumnSchema {
289 #[snafu(implicit)]
290 location: Location,
291 source: operator::error::Error,
292 },
293}
294
295pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
297 let msg = err.to_string();
298 let last_err_msg = common_error::ext::StackError::last(&err).to_string();
299 let code = err.status_code() as u32;
300 let header = from_err_code_msg_to_header(code, &last_err_msg);
301
302 tonic::Status::with_metadata(
303 tonic::Code::InvalidArgument,
304 msg,
305 MetadataMap::from_headers(header),
306 )
307}
308
309pub type Result<T> = std::result::Result<T, Error>;
311
312impl ErrorExt for Error {
313 fn status_code(&self) -> StatusCode {
314 match self {
315 Self::Eval { .. }
316 | Self::JoinTask { .. }
317 | Self::Datafusion { .. }
318 | Self::InsertIntoFlow { .. }
319 | Self::NoAvailableFrontend { .. }
320 | Self::FlowNotRecovered { .. } => StatusCode::Internal,
321 Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
322 Self::TableNotFound { .. }
323 | Self::TableNotFoundMeta { .. }
324 | Self::FlowNotFound { .. }
325 | Self::ListFlows { .. } => StatusCode::TableNotFound,
326 Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
327 Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
328 StatusCode::EngineExecuteQuery
329 }
330 Self::Unexpected { .. }
331 | Self::SyncCheckTask { .. }
332 | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
333 Self::NotImplemented { .. }
334 | Self::UnsupportedTemporalFilter { .. }
335 | Self::Unsupported { .. } => StatusCode::Unsupported,
336 Self::External { source, .. } => source.status_code(),
337 Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
338 Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
339 source.status_code()
340 }
341 Self::MetaClientInit { source, .. } => source.status_code(),
342
343 Self::InvalidQuery { .. }
344 | Self::InvalidRequest { .. }
345 | Self::ParseAddr { .. }
346 | Self::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
347
348 Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
349
350 Error::ConvertColumnSchema { source, .. } => source.status_code(),
351 }
352 }
353
354 fn as_any(&self) -> &dyn Any {
355 self
356 }
357}
358
359define_into_tonic_status!(Error);
360
361impl From<EvalError> for Error {
362 fn from(e: EvalError) -> Self {
363 Err::<(), _>(e).context(EvalSnafu).unwrap_err()
364 }
365}