common_datasource/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use arrow_schema::ArrowError;
18use common_error::ext::ErrorExt;
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion::parquet::errors::ParquetError;
22use snafu::{Location, Snafu};
23use url::ParseError;
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29    #[snafu(display("Unsupported compression type: {}", compression_type))]
30    UnsupportedCompressionType {
31        compression_type: String,
32        #[snafu(implicit)]
33        location: Location,
34    },
35
36    #[snafu(display("Unsupported backend protocol: {}, url: {}", protocol, url))]
37    UnsupportedBackendProtocol {
38        protocol: String,
39        #[snafu(implicit)]
40        location: Location,
41        url: String,
42    },
43
44    #[snafu(display("Unsupported format protocol: {}", format))]
45    UnsupportedFormat {
46        format: String,
47        #[snafu(implicit)]
48        location: Location,
49    },
50
51    #[snafu(display("empty host: {}", url))]
52    EmptyHostPath {
53        url: String,
54        #[snafu(implicit)]
55        location: Location,
56    },
57
58    #[snafu(display("Invalid url: {}", url))]
59    InvalidUrl {
60        url: String,
61        #[snafu(source)]
62        error: ParseError,
63        #[snafu(implicit)]
64        location: Location,
65    },
66
67    #[snafu(display("Failed to build backend"))]
68    BuildBackend {
69        #[snafu(source)]
70        error: object_store::Error,
71        #[snafu(implicit)]
72        location: Location,
73    },
74
75    #[snafu(display("Failed to build orc reader"))]
76    OrcReader {
77        #[snafu(implicit)]
78        location: Location,
79        #[snafu(source)]
80        error: orc_rust::error::OrcError,
81    },
82
83    #[snafu(display("Failed to read object from path: {}", path))]
84    ReadObject {
85        path: String,
86        #[snafu(implicit)]
87        location: Location,
88        #[snafu(source)]
89        error: object_store::Error,
90    },
91
92    #[snafu(display("Failed to write object to path: {}", path))]
93    WriteObject {
94        path: String,
95        #[snafu(implicit)]
96        location: Location,
97        #[snafu(source)]
98        error: object_store::Error,
99    },
100
101    #[snafu(display("Failed to write"))]
102    AsyncWrite {
103        #[snafu(source)]
104        error: std::io::Error,
105        #[snafu(implicit)]
106        location: Location,
107    },
108
109    #[snafu(display("Failed to write record batch"))]
110    WriteRecordBatch {
111        #[snafu(implicit)]
112        location: Location,
113        #[snafu(source)]
114        error: ArrowError,
115    },
116
117    #[snafu(display("Failed to encode record batch"))]
118    EncodeRecordBatch {
119        #[snafu(implicit)]
120        location: Location,
121        #[snafu(source)]
122        error: ParquetError,
123    },
124
125    #[snafu(display("Failed to read record batch"))]
126    ReadRecordBatch {
127        #[snafu(implicit)]
128        location: Location,
129        #[snafu(source)]
130        error: datafusion::error::DataFusionError,
131    },
132
133    #[snafu(display("Failed to read parquet"))]
134    ReadParquetSnafu {
135        #[snafu(implicit)]
136        location: Location,
137        #[snafu(source)]
138        error: datafusion::parquet::errors::ParquetError,
139    },
140
141    #[snafu(display("Failed to convert parquet to schema"))]
142    ParquetToSchema {
143        #[snafu(implicit)]
144        location: Location,
145        #[snafu(source)]
146        error: datafusion::parquet::errors::ParquetError,
147    },
148
149    #[snafu(display("Failed to infer schema from file"))]
150    InferSchema {
151        #[snafu(implicit)]
152        location: Location,
153        #[snafu(source)]
154        error: arrow_schema::ArrowError,
155    },
156
157    #[snafu(display("Failed to list object in path: {}", path))]
158    ListObjects {
159        path: String,
160        #[snafu(implicit)]
161        location: Location,
162        #[snafu(source)]
163        error: object_store::Error,
164    },
165
166    #[snafu(display("Invalid connection: {}", msg))]
167    InvalidConnection {
168        msg: String,
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Failed to join handle"))]
174    JoinHandle {
175        #[snafu(implicit)]
176        location: Location,
177        #[snafu(source)]
178        error: tokio::task::JoinError,
179    },
180
181    #[snafu(display("Failed to parse format {} with value: {}", key, value))]
182    ParseFormat {
183        key: String,
184        value: String,
185        #[snafu(implicit)]
186        location: Location,
187    },
188
189    #[snafu(display("Failed to merge schema"))]
190    MergeSchema {
191        #[snafu(source)]
192        error: arrow_schema::ArrowError,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Buffered writer closed"))]
198    BufferedWriterClosed {
199        #[snafu(implicit)]
200        location: Location,
201    },
202
203    #[snafu(display("Failed to write parquet file, path: {}", path))]
204    WriteParquet {
205        path: String,
206        #[snafu(implicit)]
207        location: Location,
208        #[snafu(source)]
209        error: parquet::errors::ParquetError,
210    },
211}
212
213pub type Result<T> = std::result::Result<T, Error>;
214
215impl ErrorExt for Error {
216    fn status_code(&self) -> StatusCode {
217        use Error::*;
218        match self {
219            BuildBackend { .. }
220            | ListObjects { .. }
221            | ReadObject { .. }
222            | WriteObject { .. }
223            | AsyncWrite { .. }
224            | WriteParquet { .. } => StatusCode::StorageUnavailable,
225
226            UnsupportedBackendProtocol { .. }
227            | UnsupportedCompressionType { .. }
228            | UnsupportedFormat { .. }
229            | InvalidConnection { .. }
230            | InvalidUrl { .. }
231            | EmptyHostPath { .. }
232            | InferSchema { .. }
233            | ReadParquetSnafu { .. }
234            | ParquetToSchema { .. }
235            | ParseFormat { .. }
236            | MergeSchema { .. } => StatusCode::InvalidArguments,
237
238            JoinHandle { .. }
239            | ReadRecordBatch { .. }
240            | WriteRecordBatch { .. }
241            | EncodeRecordBatch { .. }
242            | BufferedWriterClosed { .. }
243            | OrcReader { .. } => StatusCode::Unexpected,
244        }
245    }
246
247    fn as_any(&self) -> &dyn Any {
248        self
249    }
250}