common_recordbatch/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Error of record batch.
16use std::any::Any;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use datafusion_common::ScalarValue;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::schema::SchemaRef;
24use snafu::{Location, Snafu};
25
26pub type Result<T, E = Error> = std::result::Result<T, E>;
27
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Fail to create datafusion record batch"))]
33    NewDfRecordBatch {
34        #[snafu(source)]
35        error: datatypes::arrow::error::ArrowError,
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display("Data types error"))]
41    DataTypes {
42        #[snafu(implicit)]
43        location: Location,
44        source: datatypes::error::Error,
45    },
46
47    #[snafu(display("External error"))]
48    External {
49        #[snafu(implicit)]
50        location: Location,
51        source: BoxedError,
52    },
53
54    #[snafu(display("Failed to create RecordBatches, reason: {}", reason))]
55    CreateRecordBatches {
56        reason: String,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to convert Arrow schema"))]
62    SchemaConversion {
63        source: datatypes::error::Error,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(transparent)]
69    PollStream {
70        #[snafu(source)]
71        error: datafusion::error::DataFusionError,
72        #[snafu(implicit)]
73        location: Location,
74    },
75
76    #[snafu(display("Create physical expr error"))]
77    PhysicalExpr {
78        #[snafu(source)]
79        error: datafusion::error::DataFusionError,
80        #[snafu(implicit)]
81        location: Location,
82    },
83
84    #[snafu(display("Fail to format record batch"))]
85    Format {
86        #[snafu(source)]
87        error: datatypes::arrow::error::ArrowError,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Failed to convert {v:?} to Arrow scalar"))]
93    ToArrowScalar {
94        v: ScalarValue,
95        #[snafu(source)]
96        error: datafusion_common::DataFusionError,
97        #[snafu(implicit)]
98        location: Location,
99    },
100
101    #[snafu(display(
102        "Failed to project Arrow RecordBatch with schema {:?} and projection {:?}",
103        schema,
104        projection,
105    ))]
106    ProjectArrowRecordBatch {
107        #[snafu(source)]
108        error: datatypes::arrow::error::ArrowError,
109        #[snafu(implicit)]
110        location: Location,
111        schema: datatypes::schema::SchemaRef,
112        projection: Vec<usize>,
113    },
114
115    #[snafu(display("Column {} not exists in table {}", column_name, table_name))]
116    ColumnNotExists {
117        column_name: String,
118        table_name: String,
119        #[snafu(implicit)]
120        location: Location,
121    },
122
123    #[snafu(display(
124        "Failed to cast vector of type '{:?}' to type '{:?}'",
125        from_type,
126        to_type,
127    ))]
128    CastVector {
129        from_type: ConcreteDataType,
130        to_type: ConcreteDataType,
131        #[snafu(implicit)]
132        location: Location,
133        source: datatypes::error::Error,
134    },
135
136    #[snafu(display("Error occurs when performing arrow computation"))]
137    ArrowCompute {
138        #[snafu(source)]
139        error: datatypes::arrow::error::ArrowError,
140        #[snafu(implicit)]
141        location: Location,
142    },
143
144    #[snafu(display("Unsupported operation: {}", reason))]
145    UnsupportedOperation {
146        reason: String,
147        #[snafu(implicit)]
148        location: Location,
149    },
150
151    #[snafu(display("Cannot construct an empty stream"))]
152    EmptyStream {
153        #[snafu(implicit)]
154        location: Location,
155    },
156
157    #[snafu(display("Schema not match, left: {:?}, right: {:?}", left, right))]
158    SchemaNotMatch {
159        left: SchemaRef,
160        right: SchemaRef,
161        #[snafu(implicit)]
162        location: Location,
163    },
164    #[snafu(display("Stream timeout"))]
165    StreamTimeout {
166        #[snafu(implicit)]
167        location: Location,
168        #[snafu(source)]
169        error: tokio::time::error::Elapsed,
170    },
171    #[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]
172    RecordBatchSliceIndexOverflow {
173        #[snafu(implicit)]
174        location: Location,
175        size: usize,
176        visit_index: usize,
177    },
178}
179
180impl ErrorExt for Error {
181    fn status_code(&self) -> StatusCode {
182        match self {
183            Error::NewDfRecordBatch { .. }
184            | Error::EmptyStream { .. }
185            | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
186
187            Error::DataTypes { .. }
188            | Error::CreateRecordBatches { .. }
189            | Error::Format { .. }
190            | Error::ToArrowScalar { .. }
191            | Error::ProjectArrowRecordBatch { .. }
192            | Error::PhysicalExpr { .. }
193            | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
194
195            Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
196
197            Error::ArrowCompute { .. } => StatusCode::IllegalState,
198
199            Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
200
201            Error::External { source, .. } => source.status_code(),
202
203            Error::UnsupportedOperation { .. } => StatusCode::Unsupported,
204
205            Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
206                source.status_code()
207            }
208
209            Error::StreamTimeout { .. } => StatusCode::Cancelled,
210        }
211    }
212
213    fn as_any(&self) -> &dyn Any {
214        self
215    }
216}