1use std::any::Any;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_macro::stack_trace_debug;
20use common_runtime::error::Error as RuntimeError;
21use serde_json::error::Error as JsonError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29 #[snafu(display("Failed to create TLS Config"))]
30 TlsConfig {
31 #[snafu(implicit)]
32 location: Location,
33 source: common_wal::error::Error,
34 },
35
36 #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
37 InvalidProvider {
38 #[snafu(implicit)]
39 location: Location,
40 expected: String,
41 actual: String,
42 },
43
44 #[snafu(display("Failed to start log store task: {}", name))]
45 StartWalTask {
46 name: String,
47 #[snafu(implicit)]
48 location: Location,
49 source: RuntimeError,
50 },
51
52 #[snafu(display("Failed to stop log store task: {}", name))]
53 StopWalTask {
54 name: String,
55 #[snafu(implicit)]
56 location: Location,
57 source: RuntimeError,
58 },
59
60 #[snafu(display("Failed to add entry to LogBatch"))]
61 AddEntryLogBatch {
62 #[snafu(source)]
63 error: raft_engine::Error,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(display("Failed to perform raft-engine operation"))]
69 RaftEngine {
70 #[snafu(source)]
71 error: raft_engine::Error,
72 #[snafu(implicit)]
73 location: Location,
74 },
75
76 #[snafu(display("Failed to perform IO on path: {}", path))]
77 Io {
78 path: String,
79 #[snafu(source)]
80 error: std::io::Error,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Log store not started yet"))]
86 IllegalState {
87 #[snafu(implicit)]
88 location: Location,
89 },
90
91 #[snafu(display("Namespace is illegal: {}", ns))]
92 IllegalNamespace {
93 ns: u64,
94 #[snafu(implicit)]
95 location: Location,
96 },
97
98 #[snafu(display(
99 "Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}",
100 ns,
101 start,
102 end,
103 max_size,
104 ))]
105 FetchEntry {
106 ns: u64,
107 start: u64,
108 end: u64,
109 max_size: usize,
110 #[snafu(source)]
111 error: raft_engine::Error,
112 #[snafu(implicit)]
113 location: Location,
114 },
115
116 #[snafu(display(
117 "Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}",
118 namespace,
119 first_index,
120 attempt_index
121 ))]
122 OverrideCompactedEntry {
123 namespace: u64,
124 first_index: u64,
125 attempt_index: u64,
126 #[snafu(implicit)]
127 location: Location,
128 },
129
130 #[snafu(display(
131 "Failed to build a Kafka client, broker endpoints: {:?}",
132 broker_endpoints
133 ))]
134 BuildClient {
135 broker_endpoints: Vec<String>,
136 #[snafu(implicit)]
137 location: Location,
138 #[snafu(source)]
139 error: rskafka::client::error::Error,
140 },
141
142 #[snafu(display(
143 "Failed to build a Kafka partition client, topic: {}, partition: {}",
144 topic,
145 partition
146 ))]
147 BuildPartitionClient {
148 topic: String,
149 partition: i32,
150 #[snafu(implicit)]
151 location: Location,
152 #[snafu(source)]
153 error: rskafka::client::error::Error,
154 },
155
156 #[snafu(display("Missing required key in a record"))]
157 MissingKey {
158 #[snafu(implicit)]
159 location: Location,
160 },
161
162 #[snafu(display("Missing required value in a record"))]
163 MissingValue {
164 #[snafu(implicit)]
165 location: Location,
166 },
167
168 #[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))]
169 ProduceRecord {
170 topic: String,
171 size: usize,
172 #[snafu(implicit)]
173 location: Location,
174 #[snafu(source)]
175 error: rskafka::client::producer::Error,
176 },
177
178 #[snafu(display("Failed to produce batch records to Kafka"))]
179 BatchProduce {
180 #[snafu(implicit)]
181 location: Location,
182 #[snafu(source)]
183 error: rskafka::client::error::Error,
184 },
185
186 #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
187 ConsumeRecord {
188 topic: String,
189 #[snafu(implicit)]
190 location: Location,
191 #[snafu(source)]
192 error: rskafka::client::error::Error,
193 },
194
195 #[snafu(display("Failed to get the latest offset, topic: {}", topic))]
196 GetOffset {
197 topic: String,
198 #[snafu(implicit)]
199 location: Location,
200 #[snafu(source)]
201 error: rskafka::client::error::Error,
202 },
203
204 #[snafu(display("Failed to do a cast"))]
205 Cast {
206 #[snafu(implicit)]
207 location: Location,
208 },
209
210 #[snafu(display("Failed to encode object into json"))]
211 EncodeJson {
212 #[snafu(implicit)]
213 location: Location,
214 #[snafu(source)]
215 error: JsonError,
216 },
217
218 #[snafu(display("Failed to decode object from json"))]
219 DecodeJson {
220 #[snafu(implicit)]
221 location: Location,
222 #[snafu(source)]
223 error: JsonError,
224 },
225
226 #[snafu(display("The record sequence is not legal, error: {}", error))]
227 IllegalSequence {
228 #[snafu(implicit)]
229 location: Location,
230 error: String,
231 },
232
233 #[snafu(display(
234 "Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
235 region_id,
236 last_index,
237 attempt_index
238 ))]
239 DiscontinuousLogIndex {
240 region_id: RegionId,
241 last_index: u64,
242 attempt_index: u64,
243 },
244
245 #[snafu(display("OrderedBatchProducer is stopped",))]
246 OrderedBatchProducerStopped {
247 #[snafu(implicit)]
248 location: Location,
249 },
250
251 #[snafu(display("Failed to wait for ProduceResultReceiver"))]
252 WaitProduceResultReceiver {
253 #[snafu(implicit)]
254 location: Location,
255 #[snafu(source)]
256 error: tokio::sync::oneshot::error::RecvError,
257 },
258
259 #[snafu(display("Failed to wait for result of DumpIndex"))]
260 WaitDumpIndex {
261 #[snafu(implicit)]
262 location: Location,
263 #[snafu(source)]
264 error: tokio::sync::oneshot::error::RecvError,
265 },
266
267 #[snafu(display("Failed to create writer"))]
268 CreateWriter {
269 #[snafu(implicit)]
270 location: Location,
271 #[snafu(source)]
272 error: object_store::Error,
273 },
274
275 #[snafu(display("Failed to write index"))]
276 WriteIndex {
277 #[snafu(implicit)]
278 location: Location,
279 #[snafu(source)]
280 error: object_store::Error,
281 },
282
283 #[snafu(display("Failed to read index, path: {path}"))]
284 ReadIndex {
285 #[snafu(implicit)]
286 location: Location,
287 #[snafu(source)]
288 error: object_store::Error,
289 path: String,
290 },
291
292 #[snafu(display(
293 "The length of meta if exceeded the limit: {}, actual: {}",
294 limit,
295 actual
296 ))]
297 MetaLengthExceededLimit {
298 #[snafu(implicit)]
299 location: Location,
300 limit: usize,
301 actual: usize,
302 },
303
304 #[snafu(display("No max value"))]
305 NoMaxValue {
306 #[snafu(implicit)]
307 location: Location,
308 },
309}
310
311pub type Result<T> = std::result::Result<T, Error>;
312
313fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
314 match error {
315 rskafka::client::error::Error::Connection(_)
316 | rskafka::client::error::Error::Request(_)
317 | rskafka::client::error::Error::InvalidResponse(_)
318 | rskafka::client::error::Error::ServerError { .. }
319 | rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
320 rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
321 _ => StatusCode::Internal,
322 }
323}
324
325impl ErrorExt for Error {
326 fn as_any(&self) -> &dyn Any {
327 self
328 }
329
330 fn status_code(&self) -> StatusCode {
331 use Error::*;
332
333 match self {
334 TlsConfig { .. }
335 | InvalidProvider { .. }
336 | IllegalNamespace { .. }
337 | MissingKey { .. }
338 | MissingValue { .. }
339 | OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
340 StartWalTask { .. }
341 | StopWalTask { .. }
342 | IllegalState { .. }
343 | NoMaxValue { .. }
344 | Cast { .. }
345 | EncodeJson { .. }
346 | DecodeJson { .. }
347 | IllegalSequence { .. }
348 | DiscontinuousLogIndex { .. }
349 | OrderedBatchProducerStopped { .. }
350 | WaitProduceResultReceiver { .. }
351 | WaitDumpIndex { .. }
352 | MetaLengthExceededLimit { .. } => StatusCode::Internal,
353
354 CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
356 StatusCode::StorageUnavailable
357 }
358 FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
360 StatusCode::StorageUnavailable
361 }
362 ProduceRecord { error, .. } => match error {
364 rskafka::client::producer::Error::Client(error) => {
365 rskafka_client_error_to_status_code(error)
366 }
367 rskafka::client::producer::Error::Aggregator(_)
368 | rskafka::client::producer::Error::FlushError(_)
369 | rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
370 },
371 BuildClient { error, .. }
372 | BuildPartitionClient { error, .. }
373 | BatchProduce { error, .. }
374 | GetOffset { error, .. }
375 | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
376 }
377 }
378}