datanode/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::define_into_tonic_status;
19use common_error::ext::{BoxedError, ErrorExt};
20use common_error::status_code::StatusCode;
21use common_macro::stack_trace_debug;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::error::Error as TableError;
25use tokio::time::error::Elapsed;
26
27/// Business error of datanode.
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Failed to execute async task"))]
33    AsyncTaskExecute {
34        #[snafu(implicit)]
35        location: Location,
36        source: Arc<Error>,
37    },
38
39    #[snafu(display("Failed to watch change"))]
40    WatchAsyncTaskChange {
41        #[snafu(implicit)]
42        location: Location,
43        #[snafu(source)]
44        error: tokio::sync::watch::error::RecvError,
45    },
46
47    #[snafu(display("Failed to handle heartbeat response"))]
48    HandleHeartbeatResponse {
49        #[snafu(implicit)]
50        location: Location,
51        source: common_meta::error::Error,
52    },
53
54    #[snafu(display("Failed to get info from meta server"))]
55    GetMetadata {
56        #[snafu(implicit)]
57        location: Location,
58        source: common_meta::error::Error,
59    },
60
61    #[snafu(display("Failed to execute logical plan"))]
62    ExecuteLogicalPlan {
63        #[snafu(implicit)]
64        location: Location,
65        source: query::error::Error,
66    },
67
68    #[snafu(display("Failed to create plan decoder"))]
69    NewPlanDecoder {
70        #[snafu(implicit)]
71        location: Location,
72        source: query::error::Error,
73    },
74
75    #[snafu(display("Failed to decode logical plan"))]
76    DecodeLogicalPlan {
77        #[snafu(implicit)]
78        location: Location,
79        source: common_query::error::Error,
80    },
81
82    #[snafu(display("Catalog not found: {}", name))]
83    CatalogNotFound {
84        name: String,
85        #[snafu(implicit)]
86        location: Location,
87    },
88
89    #[snafu(display("Schema not found: {}", name))]
90    SchemaNotFound {
91        name: String,
92        #[snafu(implicit)]
93        location: Location,
94    },
95
96    #[snafu(display("Missing timestamp column in request"))]
97    MissingTimestampColumn {
98        #[snafu(implicit)]
99        location: Location,
100    },
101
102    #[snafu(display("Failed to delete value from table: {}", table_name))]
103    Delete {
104        table_name: String,
105        #[snafu(implicit)]
106        location: Location,
107        source: TableError,
108    },
109
110    #[snafu(display("Failed to start server"))]
111    StartServer {
112        #[snafu(implicit)]
113        location: Location,
114        source: servers::error::Error,
115    },
116
117    #[snafu(display("Failed to parse address {}", addr))]
118    ParseAddr {
119        addr: String,
120        #[snafu(source)]
121        error: std::net::AddrParseError,
122    },
123
124    #[snafu(display("Failed to create directory {}", dir))]
125    CreateDir {
126        dir: String,
127        #[snafu(source)]
128        error: std::io::Error,
129    },
130
131    #[snafu(display("Failed to remove directory {}", dir))]
132    RemoveDir {
133        dir: String,
134        #[snafu(source)]
135        error: std::io::Error,
136    },
137
138    #[snafu(display("Failed to open log store"))]
139    OpenLogStore {
140        #[snafu(implicit)]
141        location: Location,
142        source: Box<log_store::error::Error>,
143    },
144
145    #[snafu(display("Invalid SQL, error: {}", msg))]
146    InvalidSql { msg: String },
147
148    #[snafu(display("Illegal primary keys definition: {}", msg))]
149    IllegalPrimaryKeysDef {
150        msg: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Schema {} already exists", name))]
156    SchemaExists {
157        name: String,
158        #[snafu(implicit)]
159        location: Location,
160    },
161
162    #[snafu(display("Failed to access catalog"))]
163    Catalog {
164        #[snafu(implicit)]
165        location: Location,
166        source: catalog::error::Error,
167    },
168
169    #[snafu(display("Failed to initialize meta client"))]
170    MetaClientInit {
171        #[snafu(implicit)]
172        location: Location,
173        source: meta_client::error::Error,
174    },
175
176    #[snafu(display("Missing node id in Datanode config"))]
177    MissingNodeId {
178        #[snafu(implicit)]
179        location: Location,
180    },
181
182    #[snafu(display("Failed to build http client"))]
183    BuildHttpClient {
184        #[snafu(implicit)]
185        location: Location,
186        #[snafu(source)]
187        error: reqwest::Error,
188    },
189
190    #[snafu(display("Missing required field: {}", name))]
191    MissingRequiredField {
192        name: String,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display(
198        "No valid default value can be built automatically, column: {}",
199        column,
200    ))]
201    ColumnNoneDefaultValue {
202        column: String,
203        #[snafu(implicit)]
204        location: Location,
205    },
206
207    #[snafu(display("Failed to shutdown server"))]
208    ShutdownServer {
209        #[snafu(implicit)]
210        location: Location,
211        source: servers::error::Error,
212    },
213
214    #[snafu(display("Failed to shutdown instance"))]
215    ShutdownInstance {
216        #[snafu(implicit)]
217        location: Location,
218        source: BoxedError,
219    },
220
221    #[snafu(display("Payload not exist"))]
222    PayloadNotExist {
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Unexpected, violated: {}", violated))]
228    Unexpected {
229        violated: String,
230        #[snafu(implicit)]
231        location: Location,
232    },
233
234    #[snafu(display("Failed to handle request for region {}", region_id))]
235    HandleRegionRequest {
236        region_id: RegionId,
237        #[snafu(implicit)]
238        location: Location,
239        source: BoxedError,
240    },
241
242    #[snafu(display("Failed to open batch regions"))]
243    HandleBatchOpenRequest {
244        #[snafu(implicit)]
245        location: Location,
246        source: BoxedError,
247    },
248
249    #[snafu(display("Failed to handle batch ddl request, ddl_type: {}", ddl_type))]
250    HandleBatchDdlRequest {
251        #[snafu(implicit)]
252        location: Location,
253        source: BoxedError,
254        ddl_type: String,
255    },
256
257    #[snafu(display("RegionId {} not found", region_id))]
258    RegionNotFound {
259        region_id: RegionId,
260        #[snafu(implicit)]
261        location: Location,
262    },
263
264    #[snafu(display("Region {} not ready", region_id))]
265    RegionNotReady {
266        region_id: RegionId,
267        #[snafu(implicit)]
268        location: Location,
269    },
270
271    #[snafu(display("Region {} is busy", region_id))]
272    RegionBusy {
273        region_id: RegionId,
274        #[snafu(implicit)]
275        location: Location,
276    },
277
278    #[snafu(display("Region engine {} is not registered", name))]
279    RegionEngineNotFound {
280        name: String,
281        #[snafu(implicit)]
282        location: Location,
283    },
284
285    #[snafu(display("Unsupported output type, expected: {}", expected))]
286    UnsupportedOutput {
287        expected: String,
288        #[snafu(implicit)]
289        location: Location,
290    },
291
292    #[snafu(display("Failed to build region requests"))]
293    BuildRegionRequests {
294        #[snafu(implicit)]
295        location: Location,
296        source: store_api::metadata::MetadataError,
297    },
298
299    #[snafu(display("Failed to stop region engine {}", name))]
300    StopRegionEngine {
301        name: String,
302        #[snafu(implicit)]
303        location: Location,
304        source: BoxedError,
305    },
306
307    #[snafu(display(
308        "Failed to find logical regions in physical region {}",
309        physical_region_id
310    ))]
311    FindLogicalRegions {
312        physical_region_id: RegionId,
313        source: metric_engine::error::Error,
314        #[snafu(implicit)]
315        location: Location,
316    },
317
318    #[snafu(display("Failed to build mito engine"))]
319    BuildMitoEngine {
320        source: mito2::error::Error,
321        #[snafu(implicit)]
322        location: Location,
323    },
324
325    #[snafu(display("Failed to build metric engine"))]
326    BuildMetricEngine {
327        source: metric_engine::error::Error,
328        #[snafu(implicit)]
329        location: Location,
330    },
331
332    #[snafu(display("Failed to serialize options to TOML"))]
333    TomlFormat {
334        #[snafu(implicit)]
335        location: Location,
336        #[snafu(source(from(common_config::error::Error, Box::new)))]
337        source: Box<common_config::error::Error>,
338    },
339
340    #[snafu(display(
341        "Failed to get region metadata from engine {} for region_id {}",
342        engine,
343        region_id,
344    ))]
345    GetRegionMetadata {
346        engine: String,
347        region_id: RegionId,
348        #[snafu(implicit)]
349        location: Location,
350        source: BoxedError,
351    },
352
353    #[snafu(display("DataFusion"))]
354    DataFusion {
355        #[snafu(source)]
356        error: datafusion::error::DataFusionError,
357        #[snafu(implicit)]
358        location: Location,
359    },
360
361    #[snafu(display("Failed to acquire permit, source closed"))]
362    ConcurrentQueryLimiterClosed {
363        #[snafu(source)]
364        error: tokio::sync::AcquireError,
365        #[snafu(implicit)]
366        location: Location,
367    },
368
369    #[snafu(display("Failed to acquire permit under timeouts"))]
370    ConcurrentQueryLimiterTimeout {
371        #[snafu(source)]
372        error: Elapsed,
373        #[snafu(implicit)]
374        location: Location,
375    },
376
377    #[snafu(display("Cache not found in registry"))]
378    MissingCache {
379        #[snafu(implicit)]
380        location: Location,
381    },
382
383    #[snafu(display("Failed to serialize json"))]
384    SerializeJson {
385        #[snafu(source)]
386        error: serde_json::Error,
387        #[snafu(implicit)]
388        location: Location,
389    },
390
391    #[snafu(display("Failed object store operation"))]
392    ObjectStore {
393        source: object_store::error::Error,
394        #[snafu(implicit)]
395        location: Location,
396    },
397
398    #[snafu(display("Failed to build cache store"))]
399    BuildCacheStore {
400        #[snafu(source)]
401        error: object_store::Error,
402        #[snafu(implicit)]
403        location: Location,
404    },
405}
406
407pub type Result<T> = std::result::Result<T, Error>;
408
409impl ErrorExt for Error {
410    fn status_code(&self) -> StatusCode {
411        use Error::*;
412        match self {
413            NewPlanDecoder { source, .. } | ExecuteLogicalPlan { source, .. } => {
414                source.status_code()
415            }
416
417            BuildRegionRequests { source, .. } => source.status_code(),
418            HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => {
419                source.status_code()
420            }
421
422            DecodeLogicalPlan { source, .. } => source.status_code(),
423
424            Delete { source, .. } => source.status_code(),
425
426            InvalidSql { .. }
427            | IllegalPrimaryKeysDef { .. }
428            | MissingTimestampColumn { .. }
429            | CatalogNotFound { .. }
430            | SchemaNotFound { .. }
431            | SchemaExists { .. }
432            | MissingNodeId { .. }
433            | ColumnNoneDefaultValue { .. }
434            | Catalog { .. }
435            | MissingRequiredField { .. }
436            | RegionEngineNotFound { .. }
437            | ParseAddr { .. }
438            | TomlFormat { .. } => StatusCode::InvalidArguments,
439
440            PayloadNotExist { .. }
441            | Unexpected { .. }
442            | WatchAsyncTaskChange { .. }
443            | BuildHttpClient { .. } => StatusCode::Unexpected,
444
445            AsyncTaskExecute { source, .. } => source.status_code(),
446
447            CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => {
448                StatusCode::Internal
449            }
450
451            RegionNotFound { .. } => StatusCode::RegionNotFound,
452            RegionNotReady { .. } => StatusCode::RegionNotReady,
453            RegionBusy { .. } => StatusCode::RegionBusy,
454
455            StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
456
457            OpenLogStore { source, .. } => source.status_code(),
458            MetaClientInit { source, .. } => source.status_code(),
459            UnsupportedOutput { .. } => StatusCode::Unsupported,
460            HandleRegionRequest { source, .. }
461            | GetRegionMetadata { source, .. }
462            | HandleBatchOpenRequest { source, .. }
463            | HandleBatchDdlRequest { source, .. } => source.status_code(),
464            StopRegionEngine { source, .. } => source.status_code(),
465
466            FindLogicalRegions { source, .. } => source.status_code(),
467            BuildMitoEngine { source, .. } => source.status_code(),
468            BuildMetricEngine { source, .. } => source.status_code(),
469            ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
470                StatusCode::RegionBusy
471            }
472            MissingCache { .. } => StatusCode::Internal,
473            SerializeJson { .. } => StatusCode::Internal,
474
475            ObjectStore { source, .. } => source.status_code(),
476            BuildCacheStore { .. } => StatusCode::StorageUnavailable,
477        }
478    }
479
480    fn as_any(&self) -> &dyn Any {
481        self
482    }
483}
484
485define_into_tonic_status!(Error);