meta_client/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::ErrorExt;
16use common_error::status_code::{convert_tonic_code_to_status_code, StatusCode};
17use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
18use common_macro::stack_trace_debug;
19use snafu::{location, Location, Snafu};
20use tonic::Status;
21
22#[derive(Snafu)]
23#[snafu(visibility(pub))]
24#[stack_trace_debug]
25pub enum Error {
26    #[snafu(display("Illegal GRPC client state: {}", err_msg))]
27    IllegalGrpcClientState {
28        err_msg: String,
29        #[snafu(implicit)]
30        location: Location,
31    },
32
33    #[snafu(display("{}", msg))]
34    MetaServer {
35        code: StatusCode,
36        msg: String,
37        tonic_code: tonic::Code,
38        #[snafu(implicit)]
39        location: Location,
40    },
41
42    #[snafu(display("No leader, should ask leader first"))]
43    NoLeader {
44        #[snafu(implicit)]
45        location: Location,
46    },
47
48    #[snafu(display("Ask leader timeout"))]
49    AskLeaderTimeout {
50        #[snafu(implicit)]
51        location: Location,
52        #[snafu(source)]
53        error: tokio::time::error::Elapsed,
54    },
55
56    #[snafu(display("Failed to create gRPC channel"))]
57    CreateChannel {
58        #[snafu(implicit)]
59        location: Location,
60        source: common_grpc::error::Error,
61    },
62
63    #[snafu(display("{} not started", name))]
64    NotStarted {
65        name: String,
66        #[snafu(implicit)]
67        location: Location,
68    },
69
70    #[snafu(display("Failed to send heartbeat: {}", err_msg))]
71    SendHeartbeat {
72        err_msg: String,
73        #[snafu(implicit)]
74        location: Location,
75    },
76
77    #[snafu(display("Failed create heartbeat stream to server"))]
78    CreateHeartbeatStream {
79        #[snafu(implicit)]
80        location: Location,
81    },
82
83    #[snafu(display("Invalid response header"))]
84    InvalidResponseHeader {
85        #[snafu(implicit)]
86        location: Location,
87        source: common_meta::error::Error,
88    },
89
90    #[snafu(display("Failed to convert Metasrv request"))]
91    ConvertMetaRequest {
92        #[snafu(implicit)]
93        location: Location,
94        source: common_meta::error::Error,
95    },
96
97    #[snafu(display("Failed to convert Metasrv response"))]
98    ConvertMetaResponse {
99        #[snafu(implicit)]
100        location: Location,
101        source: common_meta::error::Error,
102    },
103
104    #[snafu(display("Failed to get flow stat"))]
105    GetFlowStat {
106        #[snafu(implicit)]
107        location: Location,
108        source: common_meta::error::Error,
109    },
110
111    #[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
112    RetryTimesExceeded { times: usize, msg: String },
113
114    #[snafu(display("Trying to write to a read-only kv backend: {}", name))]
115    ReadOnlyKvBackend {
116        name: String,
117        #[snafu(implicit)]
118        location: Location,
119    },
120}
121
122#[allow(dead_code)]
123pub type Result<T> = std::result::Result<T, Error>;
124
125impl ErrorExt for Error {
126    fn as_any(&self) -> &dyn std::any::Any {
127        self
128    }
129
130    fn status_code(&self) -> StatusCode {
131        match self {
132            Error::IllegalGrpcClientState { .. }
133            | Error::NoLeader { .. }
134            | Error::AskLeaderTimeout { .. }
135            | Error::NotStarted { .. }
136            | Error::SendHeartbeat { .. }
137            | Error::CreateHeartbeatStream { .. }
138            | Error::CreateChannel { .. }
139            | Error::RetryTimesExceeded { .. }
140            | Error::ReadOnlyKvBackend { .. } => StatusCode::Internal,
141
142            Error::MetaServer { code, .. } => *code,
143
144            Error::InvalidResponseHeader { source, .. }
145            | Error::ConvertMetaRequest { source, .. }
146            | Error::ConvertMetaResponse { source, .. }
147            | Error::GetFlowStat { source, .. } => source.status_code(),
148        }
149    }
150}
151
152impl Error {
153    pub fn is_exceeded_size_limit(&self) -> bool {
154        matches!(
155            self,
156            Error::MetaServer {
157                tonic_code: tonic::Code::OutOfRange,
158                ..
159            }
160        )
161    }
162}
163
164// FIXME(dennis): partial duplicated with src/client/src/error.rs
165impl From<Status> for Error {
166    fn from(e: Status) -> Self {
167        fn get_metadata_value(s: &Status, key: &str) -> Option<String> {
168            s.metadata()
169                .get(key)
170                .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
171        }
172
173        let code = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_CODE).and_then(|s| {
174            if let Ok(code) = s.parse::<u32>() {
175                StatusCode::from_u32(code)
176            } else {
177                None
178            }
179        });
180        let tonic_code = e.code();
181        let code = code.unwrap_or_else(|| convert_tonic_code_to_status_code(tonic_code));
182
183        let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
184            .unwrap_or_else(|| e.message().to_string());
185
186        Self::MetaServer {
187            code,
188            msg,
189            tonic_code,
190            location: location!(),
191        }
192    }
193}