catalog/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt::Debug;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_query::error::datafusion_status_code;
22use datafusion::error::DataFusionError;
23use snafu::{Location, Snafu};
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29    #[snafu(display("Failed to list catalogs"))]
30    ListCatalogs {
31        #[snafu(implicit)]
32        location: Location,
33        source: BoxedError,
34    },
35
36    #[snafu(display("Failed to list {}'s schemas", catalog))]
37    ListSchemas {
38        #[snafu(implicit)]
39        location: Location,
40        catalog: String,
41        source: BoxedError,
42    },
43
44    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
45    ListTables {
46        #[snafu(implicit)]
47        location: Location,
48        catalog: String,
49        schema: String,
50        source: BoxedError,
51    },
52
53    #[snafu(display("Failed to list nodes in cluster"))]
54    ListNodes {
55        #[snafu(implicit)]
56        location: Location,
57        source: BoxedError,
58    },
59
60    #[snafu(display("Failed to region stats in cluster"))]
61    ListRegionStats {
62        #[snafu(implicit)]
63        location: Location,
64        source: BoxedError,
65    },
66
67    #[snafu(display("Failed to list flow stats"))]
68    ListFlowStats {
69        #[snafu(implicit)]
70        location: Location,
71        source: BoxedError,
72    },
73
74    #[snafu(display("Failed to list flows in catalog {catalog}"))]
75    ListFlows {
76        #[snafu(implicit)]
77        location: Location,
78        catalog: String,
79        source: BoxedError,
80    },
81
82    #[snafu(display("Flow info not found: {flow_name} in catalog {catalog_name}"))]
83    FlowInfoNotFound {
84        flow_name: String,
85        catalog_name: String,
86        #[snafu(implicit)]
87        location: Location,
88    },
89
90    #[snafu(display("Can't convert value to json, input={input}"))]
91    Json {
92        input: String,
93        #[snafu(source)]
94        error: serde_json::error::Error,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to get information extension client"))]
100    GetInformationExtension {
101        #[snafu(implicit)]
102        location: Location,
103    },
104
105    #[snafu(display("Failed to list procedures"))]
106    ListProcedures {
107        #[snafu(implicit)]
108        location: Location,
109        source: BoxedError,
110    },
111
112    #[snafu(display("Procedure id not found"))]
113    ProcedureIdNotFound {
114        #[snafu(implicit)]
115        location: Location,
116    },
117
118    #[snafu(display("convert proto data error"))]
119    ConvertProtoData {
120        #[snafu(implicit)]
121        location: Location,
122        source: BoxedError,
123    },
124
125    #[snafu(display("Failed to create table, table info: {}", table_info))]
126    CreateTable {
127        table_info: String,
128        #[snafu(implicit)]
129        location: Location,
130        source: table::error::Error,
131    },
132
133    #[snafu(display("Cannot find catalog by name: {}", catalog_name))]
134    CatalogNotFound {
135        catalog_name: String,
136        #[snafu(implicit)]
137        location: Location,
138    },
139
140    #[snafu(display("Cannot find schema {} in catalog {}", schema, catalog))]
141    SchemaNotFound {
142        catalog: String,
143        schema: String,
144        #[snafu(implicit)]
145        location: Location,
146    },
147
148    #[snafu(display("Table `{}` already exists", table))]
149    TableExists {
150        table: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Table not found: {}", table))]
156    TableNotExist {
157        table: String,
158        #[snafu(implicit)]
159        location: Location,
160    },
161
162    #[snafu(display("View info not found: {}", name))]
163    ViewInfoNotFound {
164        name: String,
165        #[snafu(implicit)]
166        location: Location,
167    },
168
169    #[snafu(display(
170        "View plan columns changed from: {} to: {}",
171        origin_names,
172        actual_names
173    ))]
174    ViewPlanColumnsChanged {
175        origin_names: String,
176        actual_names: String,
177        #[snafu(implicit)]
178        location: Location,
179    },
180
181    #[snafu(display("Partition manager not found, it's not expected."))]
182    PartitionManagerNotFound {
183        #[snafu(implicit)]
184        location: Location,
185    },
186
187    #[snafu(display("Failed to find table partitions"))]
188    FindPartitions { source: partition::error::Error },
189
190    #[snafu(display("Failed to find region routes"))]
191    FindRegionRoutes { source: partition::error::Error },
192
193    #[snafu(display("Failed to create recordbatch"))]
194    CreateRecordBatch {
195        #[snafu(implicit)]
196        location: Location,
197        source: common_recordbatch::error::Error,
198    },
199
200    #[snafu(display("Internal error"))]
201    Internal {
202        #[snafu(implicit)]
203        location: Location,
204        source: BoxedError,
205    },
206
207    #[snafu(display("Failed to upgrade weak catalog manager reference"))]
208    UpgradeWeakCatalogManagerRef {
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Failed to decode logical plan for view: {}", name))]
214    DecodePlan {
215        name: String,
216        #[snafu(implicit)]
217        location: Location,
218        source: common_query::error::Error,
219    },
220
221    #[snafu(display("Invalid table info in catalog"))]
222    InvalidTableInfoInCatalog {
223        #[snafu(implicit)]
224        location: Location,
225        source: datatypes::error::Error,
226    },
227
228    #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
229    QueryAccessDenied { catalog: String, schema: String },
230
231    #[snafu(display("DataFusion error"))]
232    Datafusion {
233        #[snafu(source)]
234        error: DataFusionError,
235        #[snafu(implicit)]
236        location: Location,
237    },
238
239    #[snafu(display("Failed to project view columns"))]
240    ProjectViewColumns {
241        #[snafu(source)]
242        error: DataFusionError,
243        #[snafu(implicit)]
244        location: Location,
245    },
246
247    #[snafu(display("Table metadata manager error"))]
248    TableMetadataManager {
249        source: common_meta::error::Error,
250        #[snafu(implicit)]
251        location: Location,
252    },
253
254    #[snafu(display("Failed to get table cache"))]
255    GetTableCache {
256        source: common_meta::error::Error,
257        #[snafu(implicit)]
258        location: Location,
259    },
260
261    #[snafu(display("Failed to get view info from cache"))]
262    GetViewCache {
263        source: common_meta::error::Error,
264        #[snafu(implicit)]
265        location: Location,
266    },
267
268    #[snafu(display("Cache not found: {name}"))]
269    CacheNotFound {
270        name: String,
271        #[snafu(implicit)]
272        location: Location,
273    },
274
275    #[snafu(display("Failed to cast the catalog manager"))]
276    CastManager {
277        #[snafu(implicit)]
278        location: Location,
279    },
280}
281
282impl Error {
283    pub fn should_fail(&self) -> bool {
284        use Error::*;
285
286        matches!(
287            self,
288            GetViewCache { .. }
289                | ViewInfoNotFound { .. }
290                | DecodePlan { .. }
291                | ViewPlanColumnsChanged { .. }
292                | ProjectViewColumns { .. }
293        )
294    }
295}
296
297pub type Result<T> = std::result::Result<T, Error>;
298
299impl ErrorExt for Error {
300    fn status_code(&self) -> StatusCode {
301        match self {
302            Error::SchemaNotFound { .. }
303            | Error::CatalogNotFound { .. }
304            | Error::FindPartitions { .. }
305            | Error::FindRegionRoutes { .. }
306            | Error::CacheNotFound { .. }
307            | Error::CastManager { .. }
308            | Error::Json { .. }
309            | Error::GetInformationExtension { .. }
310            | Error::PartitionManagerNotFound { .. }
311            | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,
312
313            Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
314
315            Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound,
316
317            Error::FlowInfoNotFound { .. } => StatusCode::FlowNotFound,
318
319            Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,
320
321            Error::CreateRecordBatch { source, .. } => source.status_code(),
322            Error::TableExists { .. } => StatusCode::TableAlreadyExists,
323            Error::TableNotExist { .. } => StatusCode::TableNotFound,
324            Error::ListCatalogs { source, .. }
325            | Error::ListNodes { source, .. }
326            | Error::ListSchemas { source, .. }
327            | Error::ListTables { source, .. }
328            | Error::ListFlows { source, .. }
329            | Error::ListFlowStats { source, .. }
330            | Error::ListProcedures { source, .. }
331            | Error::ListRegionStats { source, .. }
332            | Error::ConvertProtoData { source, .. } => source.status_code(),
333
334            Error::CreateTable { source, .. } => source.status_code(),
335
336            Error::DecodePlan { source, .. } => source.status_code(),
337            Error::InvalidTableInfoInCatalog { source, .. } => source.status_code(),
338
339            Error::Internal { source, .. } => source.status_code(),
340
341            Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
342            Error::Datafusion { error, .. } => datafusion_status_code::<Self>(error, None),
343            Error::ProjectViewColumns { .. } => StatusCode::EngineExecuteQuery,
344            Error::TableMetadataManager { source, .. } => source.status_code(),
345            Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
346                source.status_code()
347            }
348        }
349    }
350
351    fn as_any(&self) -> &dyn Any {
352        self
353    }
354}
355
356impl From<Error> for DataFusionError {
357    fn from(e: Error) -> Self {
358        DataFusionError::External(Box::new(e))
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use snafu::GenerateImplicitData;
365
366    use super::*;
367
368    #[test]
369    pub fn test_errors_to_datafusion_error() {
370        let e: DataFusionError = Error::TableExists {
371            table: "test_table".to_string(),
372            location: Location::generate(),
373        }
374        .into();
375        match e {
376            DataFusionError::External(_) => {}
377            _ => {
378                panic!("catalog error should be converted to DataFusionError::Internal")
379            }
380        }
381    }
382}