log_store/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_macro::stack_trace_debug;
20use common_runtime::error::Error as RuntimeError;
21use serde_json::error::Error as JsonError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29    #[snafu(display("Failed to create TLS Config"))]
30    TlsConfig {
31        #[snafu(implicit)]
32        location: Location,
33        source: common_wal::error::Error,
34    },
35
36    #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
37    InvalidProvider {
38        #[snafu(implicit)]
39        location: Location,
40        expected: String,
41        actual: String,
42    },
43
44    #[snafu(display("Failed to start log store task: {}", name))]
45    StartWalTask {
46        name: String,
47        #[snafu(implicit)]
48        location: Location,
49        source: RuntimeError,
50    },
51
52    #[snafu(display("Failed to stop log store task: {}", name))]
53    StopWalTask {
54        name: String,
55        #[snafu(implicit)]
56        location: Location,
57        source: RuntimeError,
58    },
59
60    #[snafu(display("Failed to add entry to LogBatch"))]
61    AddEntryLogBatch {
62        #[snafu(source)]
63        error: raft_engine::Error,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Failed to perform raft-engine operation"))]
69    RaftEngine {
70        #[snafu(source)]
71        error: raft_engine::Error,
72        #[snafu(implicit)]
73        location: Location,
74    },
75
76    #[snafu(display("Failed to perform IO on path: {}", path))]
77    Io {
78        path: String,
79        #[snafu(source)]
80        error: std::io::Error,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Log store not started yet"))]
86    IllegalState {
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("Namespace is illegal: {}", ns))]
92    IllegalNamespace {
93        ns: u64,
94        #[snafu(implicit)]
95        location: Location,
96    },
97
98    #[snafu(display(
99        "Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}",
100        ns,
101        start,
102        end,
103        max_size,
104    ))]
105    FetchEntry {
106        ns: u64,
107        start: u64,
108        end: u64,
109        max_size: usize,
110        #[snafu(source)]
111        error: raft_engine::Error,
112        #[snafu(implicit)]
113        location: Location,
114    },
115
116    #[snafu(display(
117        "Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}",
118        namespace,
119        first_index,
120        attempt_index
121    ))]
122    OverrideCompactedEntry {
123        namespace: u64,
124        first_index: u64,
125        attempt_index: u64,
126        #[snafu(implicit)]
127        location: Location,
128    },
129
130    #[snafu(display(
131        "Failed to build a Kafka client, broker endpoints: {:?}",
132        broker_endpoints
133    ))]
134    BuildClient {
135        broker_endpoints: Vec<String>,
136        #[snafu(implicit)]
137        location: Location,
138        #[snafu(source)]
139        error: rskafka::client::error::Error,
140    },
141
142    #[snafu(display(
143        "Failed to build a Kafka partition client, topic: {}, partition: {}",
144        topic,
145        partition
146    ))]
147    BuildPartitionClient {
148        topic: String,
149        partition: i32,
150        #[snafu(implicit)]
151        location: Location,
152        #[snafu(source)]
153        error: rskafka::client::error::Error,
154    },
155
156    #[snafu(display("Missing required key in a record"))]
157    MissingKey {
158        #[snafu(implicit)]
159        location: Location,
160    },
161
162    #[snafu(display("Missing required value in a record"))]
163    MissingValue {
164        #[snafu(implicit)]
165        location: Location,
166    },
167
168    #[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))]
169    ProduceRecord {
170        topic: String,
171        size: usize,
172        #[snafu(implicit)]
173        location: Location,
174        #[snafu(source)]
175        error: rskafka::client::producer::Error,
176    },
177
178    #[snafu(display("Failed to produce batch records to Kafka"))]
179    BatchProduce {
180        #[snafu(implicit)]
181        location: Location,
182        #[snafu(source)]
183        error: rskafka::client::error::Error,
184    },
185
186    #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
187    ConsumeRecord {
188        topic: String,
189        #[snafu(implicit)]
190        location: Location,
191        #[snafu(source)]
192        error: rskafka::client::error::Error,
193    },
194
195    #[snafu(display("Failed to get the latest offset, topic: {}", topic))]
196    GetOffset {
197        topic: String,
198        #[snafu(implicit)]
199        location: Location,
200        #[snafu(source)]
201        error: rskafka::client::error::Error,
202    },
203
204    #[snafu(display("Failed to do a cast"))]
205    Cast {
206        #[snafu(implicit)]
207        location: Location,
208    },
209
210    #[snafu(display("Failed to encode object into json"))]
211    EncodeJson {
212        #[snafu(implicit)]
213        location: Location,
214        #[snafu(source)]
215        error: JsonError,
216    },
217
218    #[snafu(display("Failed to decode object from json"))]
219    DecodeJson {
220        #[snafu(implicit)]
221        location: Location,
222        #[snafu(source)]
223        error: JsonError,
224    },
225
226    #[snafu(display("The record sequence is not legal, error: {}", error))]
227    IllegalSequence {
228        #[snafu(implicit)]
229        location: Location,
230        error: String,
231    },
232
233    #[snafu(display(
234        "Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
235        region_id,
236        last_index,
237        attempt_index
238    ))]
239    DiscontinuousLogIndex {
240        region_id: RegionId,
241        last_index: u64,
242        attempt_index: u64,
243    },
244
245    #[snafu(display("OrderedBatchProducer is stopped",))]
246    OrderedBatchProducerStopped {
247        #[snafu(implicit)]
248        location: Location,
249    },
250
251    #[snafu(display("Failed to wait for ProduceResultReceiver"))]
252    WaitProduceResultReceiver {
253        #[snafu(implicit)]
254        location: Location,
255        #[snafu(source)]
256        error: tokio::sync::oneshot::error::RecvError,
257    },
258
259    #[snafu(display("Failed to wait for result of DumpIndex"))]
260    WaitDumpIndex {
261        #[snafu(implicit)]
262        location: Location,
263        #[snafu(source)]
264        error: tokio::sync::oneshot::error::RecvError,
265    },
266
267    #[snafu(display("Failed to create writer"))]
268    CreateWriter {
269        #[snafu(implicit)]
270        location: Location,
271        #[snafu(source)]
272        error: object_store::Error,
273    },
274
275    #[snafu(display("Failed to write index"))]
276    WriteIndex {
277        #[snafu(implicit)]
278        location: Location,
279        #[snafu(source)]
280        error: object_store::Error,
281    },
282
283    #[snafu(display("Failed to read index, path: {path}"))]
284    ReadIndex {
285        #[snafu(implicit)]
286        location: Location,
287        #[snafu(source)]
288        error: object_store::Error,
289        path: String,
290    },
291
292    #[snafu(display(
293        "The length of meta if exceeded the limit: {}, actual: {}",
294        limit,
295        actual
296    ))]
297    MetaLengthExceededLimit {
298        #[snafu(implicit)]
299        location: Location,
300        limit: usize,
301        actual: usize,
302    },
303
304    #[snafu(display("No max value"))]
305    NoMaxValue {
306        #[snafu(implicit)]
307        location: Location,
308    },
309}
310
311pub type Result<T> = std::result::Result<T, Error>;
312
313fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
314    match error {
315        rskafka::client::error::Error::Connection(_)
316        | rskafka::client::error::Error::Request(_)
317        | rskafka::client::error::Error::InvalidResponse(_)
318        | rskafka::client::error::Error::ServerError { .. }
319        | rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
320        rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
321        _ => StatusCode::Internal,
322    }
323}
324
325impl ErrorExt for Error {
326    fn as_any(&self) -> &dyn Any {
327        self
328    }
329
330    fn status_code(&self) -> StatusCode {
331        use Error::*;
332
333        match self {
334            TlsConfig { .. }
335            | InvalidProvider { .. }
336            | IllegalNamespace { .. }
337            | MissingKey { .. }
338            | MissingValue { .. }
339            | OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
340            StartWalTask { .. }
341            | StopWalTask { .. }
342            | IllegalState { .. }
343            | NoMaxValue { .. }
344            | Cast { .. }
345            | EncodeJson { .. }
346            | DecodeJson { .. }
347            | IllegalSequence { .. }
348            | DiscontinuousLogIndex { .. }
349            | OrderedBatchProducerStopped { .. }
350            | WaitProduceResultReceiver { .. }
351            | WaitDumpIndex { .. }
352            | MetaLengthExceededLimit { .. } => StatusCode::Internal,
353
354            // Object store related errors
355            CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
356                StatusCode::StorageUnavailable
357            }
358            // Raft engine
359            FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
360                StatusCode::StorageUnavailable
361            }
362            // Kafka producer related errors
363            ProduceRecord { error, .. } => match error {
364                rskafka::client::producer::Error::Client(error) => {
365                    rskafka_client_error_to_status_code(error)
366                }
367                rskafka::client::producer::Error::Aggregator(_)
368                | rskafka::client::producer::Error::FlushError(_)
369                | rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
370            },
371            BuildClient { error, .. }
372            | BuildPartitionClient { error, .. }
373            | BatchProduce { error, .. }
374            | GetOffset { error, .. }
375            | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
376        }
377    }
378}