1use std::any::Any;
16
17use common_error::ext::{BoxedError, ErrorExt};
18use common_error::status_code::{convert_tonic_code_to_status_code, StatusCode};
19use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
20use common_macro::stack_trace_debug;
21use snafu::{location, Location, Snafu};
22use tonic::metadata::errors::InvalidMetadataValue;
23use tonic::{Code, Status};
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29 #[snafu(display("Illegal Flight messages, reason: {}", reason))]
30 IllegalFlightMessages {
31 reason: String,
32 #[snafu(implicit)]
33 location: Location,
34 },
35
36 #[snafu(display("Failed to do Flight get, code: {}", tonic_code))]
37 FlightGet {
38 addr: String,
39 tonic_code: Code,
40 source: BoxedError,
41 },
42
43 #[snafu(display("Failed to convert FlightData"))]
44 ConvertFlightData {
45 #[snafu(implicit)]
46 location: Location,
47 source: common_grpc::Error,
48 },
49
50 #[snafu(display("Illegal GRPC client state: {}", err_msg))]
51 IllegalGrpcClientState {
52 err_msg: String,
53 #[snafu(implicit)]
54 location: Location,
55 },
56
57 #[snafu(display("Missing required field in protobuf, field: {}", field))]
58 MissingField {
59 field: String,
60 #[snafu(implicit)]
61 location: Location,
62 },
63
64 #[snafu(display("Failed to create gRPC channel, peer address: {}", addr))]
65 CreateChannel {
66 addr: String,
67 #[snafu(implicit)]
68 location: Location,
69 source: common_grpc::error::Error,
70 },
71
72 #[snafu(display("Failed to create Tls channel manager"))]
73 CreateTlsChannel {
74 #[snafu(implicit)]
75 location: Location,
76 source: common_grpc::error::Error,
77 },
78
79 #[snafu(display("Failed to request RegionServer {}, code: {}", addr, code))]
80 RegionServer {
81 addr: String,
82 code: Code,
83 source: BoxedError,
84 #[snafu(implicit)]
85 location: Location,
86 },
87
88 #[snafu(display("Failed to request FlowServer {}, code: {}", addr, code))]
89 FlowServer {
90 addr: String,
91 code: Code,
92 source: BoxedError,
93 #[snafu(implicit)]
94 location: Location,
95 },
96
97 #[snafu(display("{}", msg))]
99 Server {
100 code: StatusCode,
101 msg: String,
102 #[snafu(implicit)]
103 location: Location,
104 },
105
106 #[snafu(display("Illegal Database response: {err_msg}"))]
107 IllegalDatabaseResponse {
108 err_msg: String,
109 #[snafu(implicit)]
110 location: Location,
111 },
112
113 #[snafu(display("Invalid Tonic metadata value"))]
114 InvalidTonicMetadataValue {
115 #[snafu(source)]
116 error: InvalidMetadataValue,
117 #[snafu(implicit)]
118 location: Location,
119 },
120}
121
122pub type Result<T> = std::result::Result<T, Error>;
123
124impl ErrorExt for Error {
125 fn status_code(&self) -> StatusCode {
126 match self {
127 Error::IllegalFlightMessages { .. }
128 | Error::MissingField { .. }
129 | Error::IllegalDatabaseResponse { .. } => StatusCode::Internal,
130
131 Error::Server { code, .. } => *code,
132 Error::FlightGet { source, .. }
133 | Error::RegionServer { source, .. }
134 | Error::FlowServer { source, .. } => source.status_code(),
135 Error::CreateChannel { source, .. }
136 | Error::ConvertFlightData { source, .. }
137 | Error::CreateTlsChannel { source, .. } => source.status_code(),
138 Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
139 Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
140 }
141 }
142
143 fn as_any(&self) -> &dyn Any {
144 self
145 }
146}
147
148impl From<Status> for Error {
149 fn from(e: Status) -> Self {
150 fn get_metadata_value(e: &Status, key: &str) -> Option<String> {
151 e.metadata()
152 .get(key)
153 .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
154 }
155
156 let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE).and_then(|s| {
157 if let Ok(code) = s.parse::<u32>() {
158 StatusCode::from_u32(code)
159 } else {
160 None
161 }
162 });
163 let tonic_code = e.code();
164 let code = code.unwrap_or_else(|| convert_tonic_code_to_status_code(tonic_code));
165
166 let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
167 .unwrap_or_else(|| e.message().to_string());
168
169 Self::Server {
170 code,
171 msg,
172 location: location!(),
173 }
174 }
175}
176
177impl Error {
178 pub fn should_retry(&self) -> bool {
179 matches!(
181 self,
182 Self::RegionServer {
183 code: Code::Cancelled,
184 ..
185 } | Self::RegionServer {
186 code: Code::DeadlineExceeded,
187 ..
188 } | Self::RegionServer {
189 code: Code::Unavailable,
190 ..
191 }
192 )
193 }
194}