1use std::any::Any;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_macro::stack_trace_debug;
20use common_runtime::error::Error as RuntimeError;
21use serde_json::error::Error as JsonError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29 #[snafu(display("Failed to create TLS Config"))]
30 TlsConfig {
31 #[snafu(implicit)]
32 location: Location,
33 source: common_wal::error::Error,
34 },
35
36 #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
37 InvalidProvider {
38 #[snafu(implicit)]
39 location: Location,
40 expected: String,
41 actual: String,
42 },
43
44 #[snafu(display("Failed to start log store task: {}", name))]
45 StartWalTask {
46 name: String,
47 #[snafu(implicit)]
48 location: Location,
49 source: RuntimeError,
50 },
51
52 #[snafu(display("Failed to stop log store task: {}", name))]
53 StopWalTask {
54 name: String,
55 #[snafu(implicit)]
56 location: Location,
57 source: RuntimeError,
58 },
59
60 #[snafu(display("Failed to add entry to LogBatch"))]
61 AddEntryLogBatch {
62 #[snafu(source)]
63 error: raft_engine::Error,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(display("Failed to perform raft-engine operation"))]
69 RaftEngine {
70 #[snafu(source)]
71 error: raft_engine::Error,
72 #[snafu(implicit)]
73 location: Location,
74 },
75
76 #[snafu(display("Failed to perform IO on path: {}", path))]
77 Io {
78 path: String,
79 #[snafu(source)]
80 error: std::io::Error,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Log store not started yet"))]
86 IllegalState {
87 #[snafu(implicit)]
88 location: Location,
89 },
90
91 #[snafu(display("Namespace is illegal: {}", ns))]
92 IllegalNamespace {
93 ns: u64,
94 #[snafu(implicit)]
95 location: Location,
96 },
97
98 #[snafu(display(
99 "Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}",
100 ns,
101 start,
102 end,
103 max_size,
104 ))]
105 FetchEntry {
106 ns: u64,
107 start: u64,
108 end: u64,
109 max_size: usize,
110 #[snafu(source)]
111 error: raft_engine::Error,
112 #[snafu(implicit)]
113 location: Location,
114 },
115
116 #[snafu(display(
117 "Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}",
118 namespace,
119 first_index,
120 attempt_index
121 ))]
122 OverrideCompactedEntry {
123 namespace: u64,
124 first_index: u64,
125 attempt_index: u64,
126 #[snafu(implicit)]
127 location: Location,
128 },
129
130 #[snafu(display(
131 "Failed to build a Kafka client, broker endpoints: {:?}",
132 broker_endpoints
133 ))]
134 BuildClient {
135 broker_endpoints: Vec<String>,
136 #[snafu(implicit)]
137 location: Location,
138 #[snafu(source)]
139 error: rskafka::client::error::Error,
140 },
141
142 #[snafu(display("Failed to resolve Kafka broker endpoint."))]
143 ResolveKafkaEndpoint { source: common_wal::error::Error },
144
145 #[snafu(display(
146 "Failed to build a Kafka partition client, topic: {}, partition: {}",
147 topic,
148 partition
149 ))]
150 BuildPartitionClient {
151 topic: String,
152 partition: i32,
153 #[snafu(implicit)]
154 location: Location,
155 #[snafu(source)]
156 error: rskafka::client::error::Error,
157 },
158
159 #[snafu(display("Missing required key in a record"))]
160 MissingKey {
161 #[snafu(implicit)]
162 location: Location,
163 },
164
165 #[snafu(display("Missing required value in a record"))]
166 MissingValue {
167 #[snafu(implicit)]
168 location: Location,
169 },
170
171 #[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))]
172 ProduceRecord {
173 topic: String,
174 size: usize,
175 #[snafu(implicit)]
176 location: Location,
177 #[snafu(source)]
178 error: rskafka::client::producer::Error,
179 },
180
181 #[snafu(display("Failed to produce batch records to Kafka"))]
182 BatchProduce {
183 #[snafu(implicit)]
184 location: Location,
185 #[snafu(source)]
186 error: rskafka::client::error::Error,
187 },
188
189 #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
190 ConsumeRecord {
191 topic: String,
192 #[snafu(implicit)]
193 location: Location,
194 #[snafu(source)]
195 error: rskafka::client::error::Error,
196 },
197
198 #[snafu(display("Failed to get the latest offset, topic: {}", topic))]
199 GetOffset {
200 topic: String,
201 #[snafu(implicit)]
202 location: Location,
203 #[snafu(source)]
204 error: rskafka::client::error::Error,
205 },
206
207 #[snafu(display("Failed to do a cast"))]
208 Cast {
209 #[snafu(implicit)]
210 location: Location,
211 },
212
213 #[snafu(display("Failed to encode object into json"))]
214 EncodeJson {
215 #[snafu(implicit)]
216 location: Location,
217 #[snafu(source)]
218 error: JsonError,
219 },
220
221 #[snafu(display("Failed to decode object from json"))]
222 DecodeJson {
223 #[snafu(implicit)]
224 location: Location,
225 #[snafu(source)]
226 error: JsonError,
227 },
228
229 #[snafu(display("The record sequence is not legal, error: {}", error))]
230 IllegalSequence {
231 #[snafu(implicit)]
232 location: Location,
233 error: String,
234 },
235
236 #[snafu(display(
237 "Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
238 region_id,
239 last_index,
240 attempt_index
241 ))]
242 DiscontinuousLogIndex {
243 region_id: RegionId,
244 last_index: u64,
245 attempt_index: u64,
246 },
247
248 #[snafu(display("OrderedBatchProducer is stopped",))]
249 OrderedBatchProducerStopped {
250 #[snafu(implicit)]
251 location: Location,
252 },
253
254 #[snafu(display("Failed to wait for ProduceResultReceiver"))]
255 WaitProduceResultReceiver {
256 #[snafu(implicit)]
257 location: Location,
258 #[snafu(source)]
259 error: tokio::sync::oneshot::error::RecvError,
260 },
261
262 #[snafu(display("Failed to wait for result of DumpIndex"))]
263 WaitDumpIndex {
264 #[snafu(implicit)]
265 location: Location,
266 #[snafu(source)]
267 error: tokio::sync::oneshot::error::RecvError,
268 },
269
270 #[snafu(display("Failed to create writer"))]
271 CreateWriter {
272 #[snafu(implicit)]
273 location: Location,
274 #[snafu(source)]
275 error: object_store::Error,
276 },
277
278 #[snafu(display("Failed to write index"))]
279 WriteIndex {
280 #[snafu(implicit)]
281 location: Location,
282 #[snafu(source)]
283 error: object_store::Error,
284 },
285
286 #[snafu(display("Failed to read index, path: {path}"))]
287 ReadIndex {
288 #[snafu(implicit)]
289 location: Location,
290 #[snafu(source)]
291 error: object_store::Error,
292 path: String,
293 },
294
295 #[snafu(display(
296 "The length of meta if exceeded the limit: {}, actual: {}",
297 limit,
298 actual
299 ))]
300 MetaLengthExceededLimit {
301 #[snafu(implicit)]
302 location: Location,
303 limit: usize,
304 actual: usize,
305 },
306
307 #[snafu(display("No max value"))]
308 NoMaxValue {
309 #[snafu(implicit)]
310 location: Location,
311 },
312}
313
314pub type Result<T> = std::result::Result<T, Error>;
315
316fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
317 match error {
318 rskafka::client::error::Error::Connection(_)
319 | rskafka::client::error::Error::Request(_)
320 | rskafka::client::error::Error::InvalidResponse(_)
321 | rskafka::client::error::Error::ServerError { .. }
322 | rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
323 rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
324 _ => StatusCode::Internal,
325 }
326}
327
328impl ErrorExt for Error {
329 fn as_any(&self) -> &dyn Any {
330 self
331 }
332
333 fn status_code(&self) -> StatusCode {
334 use Error::*;
335
336 match self {
337 TlsConfig { .. }
338 | InvalidProvider { .. }
339 | IllegalNamespace { .. }
340 | MissingKey { .. }
341 | MissingValue { .. }
342 | OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
343 StartWalTask { .. }
344 | StopWalTask { .. }
345 | IllegalState { .. }
346 | ResolveKafkaEndpoint { .. }
347 | NoMaxValue { .. }
348 | Cast { .. }
349 | EncodeJson { .. }
350 | DecodeJson { .. }
351 | IllegalSequence { .. }
352 | DiscontinuousLogIndex { .. }
353 | OrderedBatchProducerStopped { .. }
354 | WaitProduceResultReceiver { .. }
355 | WaitDumpIndex { .. }
356 | MetaLengthExceededLimit { .. } => StatusCode::Internal,
357
358 CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
360 StatusCode::StorageUnavailable
361 }
362 FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
364 StatusCode::StorageUnavailable
365 }
366 ProduceRecord { error, .. } => match error {
368 rskafka::client::producer::Error::Client(error) => {
369 rskafka_client_error_to_status_code(error)
370 }
371 rskafka::client::producer::Error::Aggregator(_)
372 | rskafka::client::producer::Error::FlushError(_)
373 | rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
374 },
375 BuildClient { error, .. }
376 | BuildPartitionClient { error, .. }
377 | BatchProduce { error, .. }
378 | GetOffset { error, .. }
379 | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
380 }
381 }
382}