1use common_error::define_from_tonic_status;
16use common_error::ext::{ErrorExt, RetryHint};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use snafu::{Location, Snafu};
20
21#[derive(Snafu)]
22#[snafu(visibility(pub))]
23#[stack_trace_debug]
24pub enum Error {
25 #[snafu(display("Illegal GRPC client state: {}", err_msg))]
26 IllegalGrpcClientState {
27 err_msg: String,
28 #[snafu(implicit)]
29 location: Location,
30 },
31
32 #[snafu(display("{}", msg))]
33 MetaServer {
34 code: StatusCode,
35 msg: String,
36 tonic_code: tonic::Code,
37 retry_hint: RetryHint,
38 #[snafu(implicit)]
39 location: Location,
40 },
41
42 #[snafu(display("No leader, should ask leader first"))]
43 NoLeader {
44 #[snafu(implicit)]
45 location: Location,
46 },
47
48 #[snafu(display("Ask leader timeout"))]
49 AskLeaderTimeout {
50 #[snafu(implicit)]
51 location: Location,
52 #[snafu(source)]
53 error: tokio::time::error::Elapsed,
54 },
55
56 #[snafu(display("Failed to create gRPC channel"))]
57 CreateChannel {
58 #[snafu(implicit)]
59 location: Location,
60 source: common_grpc::error::Error,
61 },
62
63 #[snafu(display("{} not started", name))]
64 NotStarted {
65 name: String,
66 #[snafu(implicit)]
67 location: Location,
68 },
69
70 #[snafu(display("Failed to send heartbeat: {}", err_msg))]
71 SendHeartbeat {
72 err_msg: String,
73 #[snafu(implicit)]
74 location: Location,
75 },
76
77 #[snafu(display("Failed create heartbeat stream to server"))]
78 CreateHeartbeatStream {
79 #[snafu(implicit)]
80 location: Location,
81 },
82
83 #[snafu(display("Invalid response header"))]
84 InvalidResponseHeader {
85 #[snafu(implicit)]
86 location: Location,
87 source: common_meta::error::Error,
88 },
89
90 #[snafu(display("Failed to convert Metasrv request"))]
91 ConvertMetaRequest {
92 #[snafu(implicit)]
93 location: Location,
94 source: common_meta::error::Error,
95 },
96
97 #[snafu(display("Failed to convert Metasrv response"))]
98 ConvertMetaResponse {
99 #[snafu(implicit)]
100 location: Location,
101 source: common_meta::error::Error,
102 },
103
104 #[snafu(display("Failed to get flow stat"))]
105 GetFlowStat {
106 #[snafu(implicit)]
107 location: Location,
108 source: common_meta::error::Error,
109 },
110
111 #[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
112 RetryTimesExceeded { times: usize, msg: String },
113
114 #[snafu(display("Trying to write to a read-only kv backend: {}", name))]
115 ReadOnlyKvBackend {
116 name: String,
117 #[snafu(implicit)]
118 location: Location,
119 },
120
121 #[snafu(display("Failed to convert meta config"))]
122 ConvertMetaConfig {
123 #[snafu(implicit)]
124 location: Location,
125 #[snafu(source)]
126 error: serde_json::Error,
127 },
128}
129
130#[allow(dead_code)]
131pub type Result<T> = std::result::Result<T, Error>;
132
133impl ErrorExt for Error {
134 fn as_any(&self) -> &dyn std::any::Any {
135 self
136 }
137
138 fn status_code(&self) -> StatusCode {
139 match self {
140 Error::IllegalGrpcClientState { .. }
141 | Error::NoLeader { .. }
142 | Error::AskLeaderTimeout { .. }
143 | Error::NotStarted { .. }
144 | Error::SendHeartbeat { .. }
145 | Error::CreateHeartbeatStream { .. }
146 | Error::CreateChannel { .. }
147 | Error::RetryTimesExceeded { .. }
148 | Error::ConvertMetaConfig { .. } => StatusCode::Internal,
149
150 Error::ReadOnlyKvBackend { .. } => StatusCode::Unsupported,
151
152 Error::MetaServer { code, .. } => *code,
153
154 Error::InvalidResponseHeader { source, .. }
155 | Error::ConvertMetaRequest { source, .. }
156 | Error::ConvertMetaResponse { source, .. }
157 | Error::GetFlowStat { source, .. } => source.status_code(),
158 }
159 }
160
161 fn retry_hint(&self) -> RetryHint {
162 match self {
163 Error::MetaServer { retry_hint, .. } => *retry_hint,
164 Error::InvalidResponseHeader { source, .. }
165 | Error::ConvertMetaRequest { source, .. }
166 | Error::ConvertMetaResponse { source, .. }
167 | Error::GetFlowStat { source, .. } => source.retry_hint(),
168 Error::CreateChannel { source, .. } => source.retry_hint(),
169 _ => RetryHint::NonRetryable,
170 }
171 }
172}
173
174impl Error {
175 pub fn is_exceeded_size_limit(&self) -> bool {
176 matches!(
177 self,
178 Error::MetaServer {
179 tonic_code: tonic::Code::OutOfRange,
180 ..
181 }
182 )
183 }
184}
185
186define_from_tonic_status!(Error, MetaServer);
187
188#[cfg(test)]
189mod tests {
190 use common_error::ext::{ErrorExt, RetryHint};
191 use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT};
192 use tonic::codegen::http::{HeaderMap, HeaderValue};
193 use tonic::metadata::MetadataMap;
194
195 use super::*;
196
197 #[test]
198 fn test_from_tonic_status_fallbacks_to_status_code() {
199 let status = tonic::Status::new(tonic::Code::Internal, "blabla");
200
201 let err: Error = status.into();
202
203 assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
204 }
205
206 #[test]
207 fn test_from_tonic_status_fallback_can_be_non_retryable() {
208 let mut headers = HeaderMap::new();
209 headers.insert(
210 GREPTIME_DB_HEADER_ERROR_CODE,
211 HeaderValue::from(StatusCode::InvalidArguments as u32),
212 );
213 let status = tonic::Status::with_metadata(
214 tonic::Code::Internal,
215 "blabla",
216 MetadataMap::from_headers(headers),
217 );
218
219 let err: Error = status.into();
220
221 assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
222 }
223
224 #[test]
225 fn test_from_tonic_status_with_retry_hint() {
226 let mut headers = HeaderMap::new();
227 headers.insert(
228 GREPTIME_DB_HEADER_ERROR_CODE,
229 HeaderValue::from(StatusCode::Internal as u32),
230 );
231 headers.insert(
232 GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
233 HeaderValue::from_static(RetryHint::Retryable.as_str()),
234 );
235 let status = tonic::Status::with_metadata(
236 tonic::Code::Internal,
237 "blabla",
238 MetadataMap::from_headers(headers),
239 );
240
241 let err: Error = status.into();
242
243 assert_eq!(err.retry_hint(), RetryHint::Retryable);
244 }
245}