Skip to main content

meta_client/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_from_tonic_status;
16use common_error::ext::{ErrorExt, RetryHint};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use snafu::{Location, Snafu};
20
21#[derive(Snafu)]
22#[snafu(visibility(pub))]
23#[stack_trace_debug]
24pub enum Error {
25    #[snafu(display("Illegal GRPC client state: {}", err_msg))]
26    IllegalGrpcClientState {
27        err_msg: String,
28        #[snafu(implicit)]
29        location: Location,
30    },
31
32    #[snafu(display("{}", msg))]
33    MetaServer {
34        code: StatusCode,
35        msg: String,
36        tonic_code: tonic::Code,
37        retry_hint: RetryHint,
38        #[snafu(implicit)]
39        location: Location,
40    },
41
42    #[snafu(display("No leader, should ask leader first"))]
43    NoLeader {
44        #[snafu(implicit)]
45        location: Location,
46    },
47
48    #[snafu(display("Ask leader timeout"))]
49    AskLeaderTimeout {
50        #[snafu(implicit)]
51        location: Location,
52        #[snafu(source)]
53        error: tokio::time::error::Elapsed,
54    },
55
56    #[snafu(display("Failed to create gRPC channel"))]
57    CreateChannel {
58        #[snafu(implicit)]
59        location: Location,
60        source: common_grpc::error::Error,
61    },
62
63    #[snafu(display("{} not started", name))]
64    NotStarted {
65        name: String,
66        #[snafu(implicit)]
67        location: Location,
68    },
69
70    #[snafu(display("Failed to send heartbeat: {}", err_msg))]
71    SendHeartbeat {
72        err_msg: String,
73        #[snafu(implicit)]
74        location: Location,
75    },
76
77    #[snafu(display("Failed create heartbeat stream to server"))]
78    CreateHeartbeatStream {
79        #[snafu(implicit)]
80        location: Location,
81    },
82
83    #[snafu(display("Invalid response header"))]
84    InvalidResponseHeader {
85        #[snafu(implicit)]
86        location: Location,
87        source: common_meta::error::Error,
88    },
89
90    #[snafu(display("Failed to convert Metasrv request"))]
91    ConvertMetaRequest {
92        #[snafu(implicit)]
93        location: Location,
94        source: common_meta::error::Error,
95    },
96
97    #[snafu(display("Failed to convert Metasrv response"))]
98    ConvertMetaResponse {
99        #[snafu(implicit)]
100        location: Location,
101        source: common_meta::error::Error,
102    },
103
104    #[snafu(display("Failed to get flow stat"))]
105    GetFlowStat {
106        #[snafu(implicit)]
107        location: Location,
108        source: common_meta::error::Error,
109    },
110
111    #[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
112    RetryTimesExceeded { times: usize, msg: String },
113
114    #[snafu(display("Trying to write to a read-only kv backend: {}", name))]
115    ReadOnlyKvBackend {
116        name: String,
117        #[snafu(implicit)]
118        location: Location,
119    },
120
121    #[snafu(display("Failed to convert meta config"))]
122    ConvertMetaConfig {
123        #[snafu(implicit)]
124        location: Location,
125        #[snafu(source)]
126        error: serde_json::Error,
127    },
128}
129
130#[allow(dead_code)]
131pub type Result<T> = std::result::Result<T, Error>;
132
133impl ErrorExt for Error {
134    fn as_any(&self) -> &dyn std::any::Any {
135        self
136    }
137
138    fn status_code(&self) -> StatusCode {
139        match self {
140            Error::IllegalGrpcClientState { .. }
141            | Error::NoLeader { .. }
142            | Error::AskLeaderTimeout { .. }
143            | Error::NotStarted { .. }
144            | Error::SendHeartbeat { .. }
145            | Error::CreateHeartbeatStream { .. }
146            | Error::CreateChannel { .. }
147            | Error::RetryTimesExceeded { .. }
148            | Error::ConvertMetaConfig { .. } => StatusCode::Internal,
149
150            Error::ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
151
152            Error::MetaServer { code, .. } => *code,
153
154            Error::InvalidResponseHeader { source, .. }
155            | Error::ConvertMetaRequest { source, .. }
156            | Error::ConvertMetaResponse { source, .. }
157            | Error::GetFlowStat { source, .. } => source.status_code(),
158        }
159    }
160
161    fn retry_hint(&self) -> RetryHint {
162        match self {
163            Error::MetaServer { retry_hint, .. } => *retry_hint,
164            Error::InvalidResponseHeader { source, .. }
165            | Error::ConvertMetaRequest { source, .. }
166            | Error::ConvertMetaResponse { source, .. }
167            | Error::GetFlowStat { source, .. } => source.retry_hint(),
168            Error::CreateChannel { source, .. } => source.retry_hint(),
169            _ => RetryHint::NonRetryable,
170        }
171    }
172}
173
174impl Error {
175    pub fn is_exceeded_size_limit(&self) -> bool {
176        matches!(
177            self,
178            Error::MetaServer {
179                tonic_code: tonic::Code::OutOfRange,
180                ..
181            }
182        )
183    }
184}
185
186define_from_tonic_status!(Error, MetaServer);
187
188#[cfg(test)]
189mod tests {
190    use common_error::ext::{ErrorExt, RetryHint};
191    use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT};
192    use tonic::codegen::http::{HeaderMap, HeaderValue};
193    use tonic::metadata::MetadataMap;
194
195    use super::*;
196
197    #[test]
198    fn test_from_tonic_status_fallbacks_to_status_code() {
199        let status = tonic::Status::new(tonic::Code::Internal, "blabla");
200
201        let err: Error = status.into();
202
203        assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
204    }
205
206    #[test]
207    fn test_from_tonic_status_fallback_can_be_non_retryable() {
208        let mut headers = HeaderMap::new();
209        headers.insert(
210            GREPTIME_DB_HEADER_ERROR_CODE,
211            HeaderValue::from(StatusCode::InvalidArguments as u32),
212        );
213        let status = tonic::Status::with_metadata(
214            tonic::Code::Internal,
215            "blabla",
216            MetadataMap::from_headers(headers),
217        );
218
219        let err: Error = status.into();
220
221        assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
222    }
223
224    #[test]
225    fn test_from_tonic_status_with_retry_hint() {
226        let mut headers = HeaderMap::new();
227        headers.insert(
228            GREPTIME_DB_HEADER_ERROR_CODE,
229            HeaderValue::from(StatusCode::Internal as u32),
230        );
231        headers.insert(
232            GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
233            HeaderValue::from_static(RetryHint::Retryable.as_str()),
234        );
235        let status = tonic::Status::with_metadata(
236            tonic::Code::Internal,
237            "blabla",
238            MetadataMap::from_headers(headers),
239        );
240
241        let err: Error = status.into();
242
243        assert_eq!(err.retry_hint(), RetryHint::Retryable);
244    }
245}