datanode/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::define_into_tonic_status;
19use common_error::ext::{BoxedError, ErrorExt};
20use common_error::status_code::StatusCode;
21use common_macro::stack_trace_debug;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::error::Error as TableError;
25use tokio::time::error::Elapsed;
26
27/// Business error of datanode.
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Failed to execute async task"))]
33    AsyncTaskExecute {
34        #[snafu(implicit)]
35        location: Location,
36        source: Arc<Error>,
37    },
38
39    #[snafu(display("Failed to watch change"))]
40    WatchAsyncTaskChange {
41        #[snafu(implicit)]
42        location: Location,
43        #[snafu(source)]
44        error: tokio::sync::watch::error::RecvError,
45    },
46
47    #[snafu(display("Failed to handle heartbeat response"))]
48    HandleHeartbeatResponse {
49        #[snafu(implicit)]
50        location: Location,
51        source: common_meta::error::Error,
52    },
53
54    #[snafu(display("Failed to get info from meta server"))]
55    GetMetadata {
56        #[snafu(implicit)]
57        location: Location,
58        source: common_meta::error::Error,
59    },
60
61    #[snafu(display("Failed to execute logical plan"))]
62    ExecuteLogicalPlan {
63        #[snafu(implicit)]
64        location: Location,
65        source: query::error::Error,
66    },
67
68    #[snafu(display("Failed to create plan decoder"))]
69    NewPlanDecoder {
70        #[snafu(implicit)]
71        location: Location,
72        source: query::error::Error,
73    },
74
75    #[snafu(display("Failed to decode logical plan"))]
76    DecodeLogicalPlan {
77        #[snafu(implicit)]
78        location: Location,
79        source: common_query::error::Error,
80    },
81
82    #[snafu(display("Schema not found: {}", name))]
83    SchemaNotFound {
84        name: String,
85        #[snafu(implicit)]
86        location: Location,
87    },
88
89    #[snafu(display("Missing timestamp column in request"))]
90    MissingTimestampColumn {
91        #[snafu(implicit)]
92        location: Location,
93    },
94
95    #[snafu(display("Failed to delete value from table: {}", table_name))]
96    Delete {
97        table_name: String,
98        #[snafu(implicit)]
99        location: Location,
100        source: TableError,
101    },
102
103    #[snafu(display("Failed to start server"))]
104    StartServer {
105        #[snafu(implicit)]
106        location: Location,
107        source: servers::error::Error,
108    },
109
110    #[snafu(display("Failed to parse address {}", addr))]
111    ParseAddr {
112        addr: String,
113        #[snafu(source)]
114        error: std::net::AddrParseError,
115    },
116
117    #[snafu(display("Failed to create directory {}", dir))]
118    CreateDir {
119        dir: String,
120        #[snafu(source)]
121        error: std::io::Error,
122    },
123
124    #[snafu(display("Failed to remove directory {}", dir))]
125    RemoveDir {
126        dir: String,
127        #[snafu(source)]
128        error: std::io::Error,
129    },
130
131    #[snafu(display("Failed to open log store"))]
132    OpenLogStore {
133        #[snafu(implicit)]
134        location: Location,
135        source: Box<log_store::error::Error>,
136    },
137
138    #[snafu(display("Invalid SQL, error: {}", msg))]
139    InvalidSql { msg: String },
140
141    #[snafu(display("Illegal primary keys definition: {}", msg))]
142    IllegalPrimaryKeysDef {
143        msg: String,
144        #[snafu(implicit)]
145        location: Location,
146    },
147
148    #[snafu(display("Schema {} already exists", name))]
149    SchemaExists {
150        name: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Failed to initialize meta client"))]
156    MetaClientInit {
157        #[snafu(implicit)]
158        location: Location,
159        source: meta_client::error::Error,
160    },
161
162    #[snafu(display("Missing node id in Datanode config"))]
163    MissingNodeId {
164        #[snafu(implicit)]
165        location: Location,
166    },
167
168    #[snafu(display("Failed to build http client"))]
169    BuildHttpClient {
170        #[snafu(implicit)]
171        location: Location,
172        #[snafu(source)]
173        error: reqwest::Error,
174    },
175
176    #[snafu(display("Missing required field: {}", name))]
177    MissingRequiredField {
178        name: String,
179        #[snafu(implicit)]
180        location: Location,
181    },
182
183    #[snafu(display(
184        "No valid default value can be built automatically, column: {}",
185        column,
186    ))]
187    ColumnNoneDefaultValue {
188        column: String,
189        #[snafu(implicit)]
190        location: Location,
191    },
192
193    #[snafu(display("Failed to shutdown server"))]
194    ShutdownServer {
195        #[snafu(implicit)]
196        location: Location,
197        source: servers::error::Error,
198    },
199
200    #[snafu(display("Failed to shutdown instance"))]
201    ShutdownInstance {
202        #[snafu(implicit)]
203        location: Location,
204        source: BoxedError,
205    },
206
207    #[snafu(display("Payload not exist"))]
208    PayloadNotExist {
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Unexpected, violated: {}", violated))]
214    Unexpected {
215        violated: String,
216        #[snafu(implicit)]
217        location: Location,
218    },
219
220    #[snafu(display("Failed to handle request for region {}", region_id))]
221    HandleRegionRequest {
222        region_id: RegionId,
223        #[snafu(implicit)]
224        location: Location,
225        source: BoxedError,
226    },
227
228    #[snafu(display("Failed to open batch regions"))]
229    HandleBatchOpenRequest {
230        #[snafu(implicit)]
231        location: Location,
232        source: BoxedError,
233    },
234
235    #[snafu(display("Failed to handle batch ddl request, ddl_type: {}", ddl_type))]
236    HandleBatchDdlRequest {
237        #[snafu(implicit)]
238        location: Location,
239        source: BoxedError,
240        ddl_type: String,
241    },
242
243    #[snafu(display("RegionId {} not found", region_id))]
244    RegionNotFound {
245        region_id: RegionId,
246        #[snafu(implicit)]
247        location: Location,
248    },
249
250    #[snafu(display("Region {} not ready", region_id))]
251    RegionNotReady {
252        region_id: RegionId,
253        #[snafu(implicit)]
254        location: Location,
255    },
256
257    #[snafu(display("Region {} is busy", region_id))]
258    RegionBusy {
259        region_id: RegionId,
260        #[snafu(implicit)]
261        location: Location,
262    },
263
264    #[snafu(display("Region engine {} is not registered", name))]
265    RegionEngineNotFound {
266        name: String,
267        #[snafu(implicit)]
268        location: Location,
269    },
270
271    #[snafu(display("Unsupported output type, expected: {}", expected))]
272    UnsupportedOutput {
273        expected: String,
274        #[snafu(implicit)]
275        location: Location,
276    },
277
278    #[snafu(display("Failed to build region requests"))]
279    BuildRegionRequests {
280        #[snafu(implicit)]
281        location: Location,
282        source: store_api::metadata::MetadataError,
283    },
284
285    #[snafu(display("Failed to stop region engine {}", name))]
286    StopRegionEngine {
287        name: String,
288        #[snafu(implicit)]
289        location: Location,
290        source: BoxedError,
291    },
292
293    #[snafu(display(
294        "Failed to find logical regions in physical region {}",
295        physical_region_id
296    ))]
297    FindLogicalRegions {
298        physical_region_id: RegionId,
299        source: metric_engine::error::Error,
300        #[snafu(implicit)]
301        location: Location,
302    },
303
304    #[snafu(display("Failed to build mito engine"))]
305    BuildMitoEngine {
306        source: mito2::error::Error,
307        #[snafu(implicit)]
308        location: Location,
309    },
310
311    #[snafu(display("Failed to build metric engine"))]
312    BuildMetricEngine {
313        source: metric_engine::error::Error,
314        #[snafu(implicit)]
315        location: Location,
316    },
317
318    #[snafu(display("Failed to serialize options to TOML"))]
319    TomlFormat {
320        #[snafu(implicit)]
321        location: Location,
322        #[snafu(source(from(common_config::error::Error, Box::new)))]
323        source: Box<common_config::error::Error>,
324    },
325
326    #[snafu(display(
327        "Failed to get region metadata from engine {} for region_id {}",
328        engine,
329        region_id,
330    ))]
331    GetRegionMetadata {
332        engine: String,
333        region_id: RegionId,
334        #[snafu(implicit)]
335        location: Location,
336        source: BoxedError,
337    },
338
339    #[snafu(display("DataFusion"))]
340    DataFusion {
341        #[snafu(source)]
342        error: datafusion::error::DataFusionError,
343        #[snafu(implicit)]
344        location: Location,
345    },
346
347    #[snafu(display("Failed to acquire permit, source closed"))]
348    ConcurrentQueryLimiterClosed {
349        #[snafu(source)]
350        error: tokio::sync::AcquireError,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("Failed to acquire permit under timeouts"))]
356    ConcurrentQueryLimiterTimeout {
357        #[snafu(source)]
358        error: Elapsed,
359        #[snafu(implicit)]
360        location: Location,
361    },
362
363    #[snafu(display("Cache not found in registry"))]
364    MissingCache {
365        #[snafu(implicit)]
366        location: Location,
367    },
368
369    #[snafu(display("Failed to serialize json"))]
370    SerializeJson {
371        #[snafu(source)]
372        error: serde_json::Error,
373        #[snafu(implicit)]
374        location: Location,
375    },
376
377    #[snafu(display("Failed object store operation"))]
378    ObjectStore {
379        source: object_store::error::Error,
380        #[snafu(implicit)]
381        location: Location,
382    },
383
384    #[snafu(display("Failed to build cache store"))]
385    BuildCacheStore {
386        #[snafu(source)]
387        error: object_store::Error,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Not yet implemented: {what}"))]
393    NotYetImplemented { what: String },
394}
395
396pub type Result<T> = std::result::Result<T, Error>;
397
398impl ErrorExt for Error {
399    fn status_code(&self) -> StatusCode {
400        use Error::*;
401        match self {
402            NewPlanDecoder { source, .. } | ExecuteLogicalPlan { source, .. } => {
403                source.status_code()
404            }
405
406            BuildRegionRequests { source, .. } => source.status_code(),
407            HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => {
408                source.status_code()
409            }
410
411            DecodeLogicalPlan { source, .. } => source.status_code(),
412
413            Delete { source, .. } => source.status_code(),
414
415            InvalidSql { .. }
416            | IllegalPrimaryKeysDef { .. }
417            | MissingTimestampColumn { .. }
418            | SchemaNotFound { .. }
419            | SchemaExists { .. }
420            | MissingNodeId { .. }
421            | ColumnNoneDefaultValue { .. }
422            | MissingRequiredField { .. }
423            | RegionEngineNotFound { .. }
424            | ParseAddr { .. }
425            | TomlFormat { .. } => StatusCode::InvalidArguments,
426
427            PayloadNotExist { .. }
428            | Unexpected { .. }
429            | WatchAsyncTaskChange { .. }
430            | BuildHttpClient { .. } => StatusCode::Unexpected,
431
432            AsyncTaskExecute { source, .. } => source.status_code(),
433
434            CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
435                StatusCode::Internal
436            }
437
438            RegionNotFound { .. } => StatusCode::RegionNotFound,
439            RegionNotReady { .. } => StatusCode::RegionNotReady,
440            RegionBusy { .. } => StatusCode::RegionBusy,
441
442            StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
443
444            OpenLogStore { source, .. } => source.status_code(),
445            MetaClientInit { source, .. } => source.status_code(),
446            UnsupportedOutput { .. } | NotYetImplemented { .. } => StatusCode::Unsupported,
447            HandleRegionRequest { source, .. }
448            | GetRegionMetadata { source, .. }
449            | HandleBatchOpenRequest { source, .. }
450            | HandleBatchDdlRequest { source, .. } => source.status_code(),
451            StopRegionEngine { source, .. } => source.status_code(),
452
453            FindLogicalRegions { source, .. } => source.status_code(),
454            BuildMitoEngine { source, .. } => source.status_code(),
455            BuildMetricEngine { source, .. } => source.status_code(),
456            ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
457                StatusCode::RegionBusy
458            }
459            MissingCache { .. } => StatusCode::Internal,
460            SerializeJson { .. } => StatusCode::Internal,
461
462            ObjectStore { source, .. } => source.status_code(),
463            BuildCacheStore { .. } => StatusCode::StorageUnavailable,
464        }
465    }
466
467    fn as_any(&self) -> &dyn Any {
468        self
469    }
470}
471
472define_into_tonic_status!(Error);