1use std::any::Any;
18
19use api::v1::CreateTableExpr;
20use arrow_schema::ArrowError;
21use common_error::ext::BoxedError;
22use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
23use common_macro::stack_trace_debug;
24use common_telemetry::common_error::ext::ErrorExt;
25use common_telemetry::common_error::status_code::StatusCode;
26use snafu::{Location, ResultExt, Snafu};
27use tonic::metadata::MetadataMap;
28
29use crate::FlowId;
30use crate::expr::EvalError;
31
32#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37 #[snafu(display(
38 "Failed to insert into flow: region_id={}, flow_ids={:?}",
39 region_id,
40 flow_ids
41 ))]
42 InsertIntoFlow {
43 region_id: u64,
44 flow_ids: Vec<u64>,
45 source: BoxedError,
46 #[snafu(implicit)]
47 location: Location,
48 },
49
50 #[snafu(display("Flow engine is still recovering"))]
51 FlowNotRecovered {
52 #[snafu(implicit)]
53 location: Location,
54 },
55
56 #[snafu(display("Error encountered while creating flow: {sql}"))]
57 CreateFlow {
58 sql: String,
59 source: BoxedError,
60 #[snafu(implicit)]
61 location: Location,
62 },
63
64 #[snafu(display("Error encountered while creating sink table for flow: {create:?}"))]
65 CreateSinkTable {
66 create: CreateTableExpr,
67 source: BoxedError,
68 #[snafu(implicit)]
69 location: Location,
70 },
71
72 #[snafu(display("Time error"))]
73 Time {
74 source: common_time::error::Error,
75 #[snafu(implicit)]
76 location: Location,
77 },
78
79 #[snafu(display("No available frontend found after timeout: {timeout:?}, context: {context}"))]
80 NoAvailableFrontend {
81 timeout: std::time::Duration,
82 context: String,
83 #[snafu(implicit)]
84 location: Location,
85 },
86
87 #[snafu(display("External error"))]
88 External {
89 source: BoxedError,
90 #[snafu(implicit)]
91 location: Location,
92 },
93
94 #[snafu(display("Internal error"))]
95 Internal {
96 reason: String,
97 #[snafu(implicit)]
98 location: Location,
99 },
100
101 #[snafu(display("Failed to eval stream"))]
103 Eval {
104 source: EvalError,
105 #[snafu(implicit)]
106 location: Location,
107 },
108
109 #[snafu(display("Table not found: {name}"))]
110 TableNotFound {
111 name: String,
112 #[snafu(implicit)]
113 location: Location,
114 },
115
116 #[snafu(display("Table not found: {msg}, meta error: {source}"))]
117 TableNotFoundMeta {
118 source: common_meta::error::Error,
119 msg: String,
120 #[snafu(implicit)]
121 location: Location,
122 },
123
124 #[snafu(display("Flow not found, id={id}"))]
125 FlowNotFound {
126 id: FlowId,
127 #[snafu(implicit)]
128 location: Location,
129 },
130
131 #[snafu(display("Failed to list flows in flownode={id:?}"))]
132 ListFlows {
133 id: Option<common_meta::FlownodeId>,
134 source: common_meta::error::Error,
135 #[snafu(implicit)]
136 location: Location,
137 },
138
139 #[snafu(display("Flow already exist, id={id}"))]
140 FlowAlreadyExist {
141 id: FlowId,
142 #[snafu(implicit)]
143 location: Location,
144 },
145
146 #[snafu(display("Failed to join task"))]
147 JoinTask {
148 #[snafu(source)]
149 error: tokio::task::JoinError,
150 #[snafu(implicit)]
151 location: Location,
152 },
153
154 #[snafu(display("Invalid query: {reason}"))]
155 InvalidQuery {
156 reason: String,
157 #[snafu(implicit)]
158 location: Location,
159 },
160
161 #[snafu(display("Not implement in flow: {reason}"))]
162 NotImplemented {
163 reason: String,
164 #[snafu(implicit)]
165 location: Location,
166 },
167
168 #[snafu(display("Invalid auth config"))]
169 IllegalAuthConfig { source: auth::error::Error },
170
171 #[snafu(display("Flow plan error: {reason}"))]
172 Plan {
173 reason: String,
174 #[snafu(implicit)]
175 location: Location,
176 },
177
178 #[snafu(display("Unsupported: {reason}"))]
179 Unsupported {
180 reason: String,
181 #[snafu(implicit)]
182 location: Location,
183 },
184
185 #[snafu(display("Unsupported temporal filter: {reason}"))]
186 UnsupportedTemporalFilter {
187 reason: String,
188 #[snafu(implicit)]
189 location: Location,
190 },
191
192 #[snafu(display("Datatypes error: {source} with extra message: {extra}"))]
193 Datatypes {
194 source: datatypes::Error,
195 extra: String,
196 #[snafu(implicit)]
197 location: Location,
198 },
199
200 #[snafu(display("Arrow error: {raw:?} in context: {context}"))]
201 Arrow {
202 #[snafu(source)]
203 raw: ArrowError,
204 context: String,
205 #[snafu(implicit)]
206 location: Location,
207 },
208
209 #[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
210 Datafusion {
211 #[snafu(source)]
212 raw: datafusion_common::DataFusionError,
213 context: String,
214 #[snafu(implicit)]
215 location: Location,
216 },
217
218 #[snafu(display("Unexpected: {reason}"))]
219 Unexpected {
220 reason: String,
221 #[snafu(implicit)]
222 location: Location,
223 },
224
225 #[snafu(display("Illegal check task state: {reason}"))]
226 IllegalCheckTaskState {
227 reason: String,
228 #[snafu(implicit)]
229 location: Location,
230 },
231
232 #[snafu(display(
233 "Failed to sync with check task for flow {} with allow_drop={}",
234 flow_id,
235 allow_drop
236 ))]
237 SyncCheckTask {
238 flow_id: FlowId,
239 allow_drop: bool,
240 #[snafu(implicit)]
241 location: Location,
242 },
243
244 #[snafu(display("Failed to start server"))]
245 StartServer {
246 #[snafu(implicit)]
247 location: Location,
248 source: servers::error::Error,
249 },
250
251 #[snafu(display("Failed to shutdown server"))]
252 ShutdownServer {
253 #[snafu(implicit)]
254 location: Location,
255 source: servers::error::Error,
256 },
257
258 #[snafu(display("Failed to initialize meta client"))]
259 MetaClientInit {
260 #[snafu(implicit)]
261 location: Location,
262 source: meta_client::error::Error,
263 },
264
265 #[snafu(display("Failed to parse address {}", addr))]
266 ParseAddr {
267 addr: String,
268 #[snafu(source)]
269 error: std::net::AddrParseError,
270 },
271
272 #[snafu(display("Failed to get cache from cache registry: {}", name))]
273 CacheRequired {
274 #[snafu(implicit)]
275 location: Location,
276 name: String,
277 },
278
279 #[snafu(display("Invalid request: {context}"))]
280 InvalidRequest {
281 context: String,
282 source: client::Error,
283 #[snafu(implicit)]
284 location: Location,
285 },
286
287 #[snafu(display("Failed to encode logical plan in substrait"))]
288 SubstraitEncodeLogicalPlan {
289 #[snafu(implicit)]
290 location: Location,
291 source: substrait::error::Error,
292 },
293
294 #[snafu(display("Failed to convert column schema to proto column def"))]
295 ConvertColumnSchema {
296 #[snafu(implicit)]
297 location: Location,
298 source: operator::error::Error,
299 },
300
301 #[snafu(display("Failed to create channel manager for gRPC client"))]
302 InvalidClientConfig {
303 #[snafu(implicit)]
304 location: Location,
305 source: common_grpc::error::Error,
306 },
307}
308
309pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
311 let msg = err.to_string();
312 let last_err_msg = common_error::ext::StackError::last(&err).to_string();
313 let code = err.status_code() as u32;
314 let header = from_err_code_msg_to_header(code, &last_err_msg);
315
316 tonic::Status::with_metadata(
317 tonic::Code::InvalidArgument,
318 msg,
319 MetadataMap::from_headers(header),
320 )
321}
322
323pub type Result<T> = std::result::Result<T, Error>;
325
326impl ErrorExt for Error {
327 fn status_code(&self) -> StatusCode {
328 match self {
329 Self::Eval { .. }
330 | Self::JoinTask { .. }
331 | Self::Datafusion { .. }
332 | Self::InsertIntoFlow { .. }
333 | Self::NoAvailableFrontend { .. }
334 | Self::FlowNotRecovered { .. } => StatusCode::Internal,
335 Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
336 Self::TableNotFound { .. }
337 | Self::TableNotFoundMeta { .. }
338 | Self::ListFlows { .. } => StatusCode::TableNotFound,
339 Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
340 Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
341 Self::CreateFlow { .. }
342 | Self::CreateSinkTable { .. }
343 | Self::Arrow { .. }
344 | Self::Time { .. } => StatusCode::EngineExecuteQuery,
345 Self::Unexpected { .. }
346 | Self::SyncCheckTask { .. }
347 | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
348 Self::NotImplemented { .. }
349 | Self::UnsupportedTemporalFilter { .. }
350 | Self::Unsupported { .. } => StatusCode::Unsupported,
351 Self::External { source, .. } => source.status_code(),
352 Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal,
353 Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
354 source.status_code()
355 }
356 Self::MetaClientInit { source, .. } => source.status_code(),
357
358 Self::InvalidQuery { .. }
359 | Self::InvalidRequest { .. }
360 | Self::ParseAddr { .. }
361 | Self::IllegalAuthConfig { .. }
362 | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
363
364 Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
365
366 Error::ConvertColumnSchema { source, .. } => source.status_code(),
367 }
368 }
369
370 fn as_any(&self) -> &dyn Any {
371 self
372 }
373}
374
375define_into_tonic_status!(Error);
376
377impl From<EvalError> for Error {
378 fn from(e: EvalError) -> Self {
379 Err::<(), _>(e).context(EvalSnafu).unwrap_err()
380 }
381}