log_store/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_macro::stack_trace_debug;
20use common_runtime::error::Error as RuntimeError;
21use serde_json::error::Error as JsonError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24
25#[derive(Snafu)]
26#[snafu(visibility(pub))]
27#[stack_trace_debug]
28pub enum Error {
29    #[snafu(display("Failed to create TLS Config"))]
30    TlsConfig {
31        #[snafu(implicit)]
32        location: Location,
33        source: common_wal::error::Error,
34    },
35
36    #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
37    InvalidProvider {
38        #[snafu(implicit)]
39        location: Location,
40        expected: String,
41        actual: String,
42    },
43
44    #[snafu(display("Failed to start log store task: {}", name))]
45    StartWalTask {
46        name: String,
47        #[snafu(implicit)]
48        location: Location,
49        source: RuntimeError,
50    },
51
52    #[snafu(display("Failed to stop log store task: {}", name))]
53    StopWalTask {
54        name: String,
55        #[snafu(implicit)]
56        location: Location,
57        source: RuntimeError,
58    },
59
60    #[snafu(display("Failed to add entry to LogBatch"))]
61    AddEntryLogBatch {
62        #[snafu(source)]
63        error: raft_engine::Error,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Failed to perform raft-engine operation"))]
69    RaftEngine {
70        #[snafu(source)]
71        error: raft_engine::Error,
72        #[snafu(implicit)]
73        location: Location,
74    },
75
76    #[snafu(display("Failed to perform IO on path: {}", path))]
77    Io {
78        path: String,
79        #[snafu(source)]
80        error: std::io::Error,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Log store not started yet"))]
86    IllegalState {
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("Namespace is illegal: {}", ns))]
92    IllegalNamespace {
93        ns: u64,
94        #[snafu(implicit)]
95        location: Location,
96    },
97
98    #[snafu(display(
99        "Failed to fetch entries from namespace: {}, start: {}, end: {}, max size: {}",
100        ns,
101        start,
102        end,
103        max_size,
104    ))]
105    FetchEntry {
106        ns: u64,
107        start: u64,
108        end: u64,
109        max_size: usize,
110        #[snafu(source)]
111        error: raft_engine::Error,
112        #[snafu(implicit)]
113        location: Location,
114    },
115
116    #[snafu(display(
117        "Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}",
118        namespace,
119        first_index,
120        attempt_index
121    ))]
122    OverrideCompactedEntry {
123        namespace: u64,
124        first_index: u64,
125        attempt_index: u64,
126        #[snafu(implicit)]
127        location: Location,
128    },
129
130    #[snafu(display(
131        "Failed to build a Kafka client, broker endpoints: {:?}",
132        broker_endpoints
133    ))]
134    BuildClient {
135        broker_endpoints: Vec<String>,
136        #[snafu(implicit)]
137        location: Location,
138        #[snafu(source)]
139        error: rskafka::client::error::Error,
140    },
141
142    #[snafu(display("Failed to resolve Kafka broker endpoint."))]
143    ResolveKafkaEndpoint { source: common_wal::error::Error },
144
145    #[snafu(display(
146        "Failed to build a Kafka partition client, topic: {}, partition: {}",
147        topic,
148        partition
149    ))]
150    BuildPartitionClient {
151        topic: String,
152        partition: i32,
153        #[snafu(implicit)]
154        location: Location,
155        #[snafu(source)]
156        error: rskafka::client::error::Error,
157    },
158
159    #[snafu(display("Missing required key in a record"))]
160    MissingKey {
161        #[snafu(implicit)]
162        location: Location,
163    },
164
165    #[snafu(display("Missing required value in a record"))]
166    MissingValue {
167        #[snafu(implicit)]
168        location: Location,
169    },
170
171    #[snafu(display("Failed to produce records to Kafka, topic: {}, size: {}", topic, size))]
172    ProduceRecord {
173        topic: String,
174        size: usize,
175        #[snafu(implicit)]
176        location: Location,
177        #[snafu(source)]
178        error: rskafka::client::producer::Error,
179    },
180
181    #[snafu(display("Failed to produce batch records to Kafka"))]
182    BatchProduce {
183        #[snafu(implicit)]
184        location: Location,
185        #[snafu(source)]
186        error: rskafka::client::error::Error,
187    },
188
189    #[snafu(display("Failed to read a record from Kafka, topic: {}", topic))]
190    ConsumeRecord {
191        topic: String,
192        #[snafu(implicit)]
193        location: Location,
194        #[snafu(source)]
195        error: rskafka::client::error::Error,
196    },
197
198    #[snafu(display("Failed to get the latest offset, topic: {}", topic))]
199    GetOffset {
200        topic: String,
201        #[snafu(implicit)]
202        location: Location,
203        #[snafu(source)]
204        error: rskafka::client::error::Error,
205    },
206
207    #[snafu(display("Failed to do a cast"))]
208    Cast {
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Failed to encode object into json"))]
214    EncodeJson {
215        #[snafu(implicit)]
216        location: Location,
217        #[snafu(source)]
218        error: JsonError,
219    },
220
221    #[snafu(display("Failed to decode object from json"))]
222    DecodeJson {
223        #[snafu(implicit)]
224        location: Location,
225        #[snafu(source)]
226        error: JsonError,
227    },
228
229    #[snafu(display("The record sequence is not legal, error: {}", error))]
230    IllegalSequence {
231        #[snafu(implicit)]
232        location: Location,
233        error: String,
234    },
235
236    #[snafu(display(
237        "Attempt to append discontinuous log entry, region: {}, last index: {}, attempt index: {}",
238        region_id,
239        last_index,
240        attempt_index
241    ))]
242    DiscontinuousLogIndex {
243        region_id: RegionId,
244        last_index: u64,
245        attempt_index: u64,
246    },
247
248    #[snafu(display("OrderedBatchProducer is stopped",))]
249    OrderedBatchProducerStopped {
250        #[snafu(implicit)]
251        location: Location,
252    },
253
254    #[snafu(display("Failed to wait for ProduceResultReceiver"))]
255    WaitProduceResultReceiver {
256        #[snafu(implicit)]
257        location: Location,
258        #[snafu(source)]
259        error: tokio::sync::oneshot::error::RecvError,
260    },
261
262    #[snafu(display("Failed to wait for result of DumpIndex"))]
263    WaitDumpIndex {
264        #[snafu(implicit)]
265        location: Location,
266        #[snafu(source)]
267        error: tokio::sync::oneshot::error::RecvError,
268    },
269
270    #[snafu(display("Failed to create writer"))]
271    CreateWriter {
272        #[snafu(implicit)]
273        location: Location,
274        #[snafu(source)]
275        error: object_store::Error,
276    },
277
278    #[snafu(display("Failed to write index"))]
279    WriteIndex {
280        #[snafu(implicit)]
281        location: Location,
282        #[snafu(source)]
283        error: object_store::Error,
284    },
285
286    #[snafu(display("Failed to read index, path: {path}"))]
287    ReadIndex {
288        #[snafu(implicit)]
289        location: Location,
290        #[snafu(source)]
291        error: object_store::Error,
292        path: String,
293    },
294
295    #[snafu(display(
296        "The length of meta if exceeded the limit: {}, actual: {}",
297        limit,
298        actual
299    ))]
300    MetaLengthExceededLimit {
301        #[snafu(implicit)]
302        location: Location,
303        limit: usize,
304        actual: usize,
305    },
306
307    #[snafu(display("No max value"))]
308    NoMaxValue {
309        #[snafu(implicit)]
310        location: Location,
311    },
312}
313
314pub type Result<T> = std::result::Result<T, Error>;
315
316fn rskafka_client_error_to_status_code(error: &rskafka::client::error::Error) -> StatusCode {
317    match error {
318        rskafka::client::error::Error::Connection(_)
319        | rskafka::client::error::Error::Request(_)
320        | rskafka::client::error::Error::InvalidResponse(_)
321        | rskafka::client::error::Error::ServerError { .. }
322        | rskafka::client::error::Error::RetryFailed(_) => StatusCode::Internal,
323        rskafka::client::error::Error::Timeout => StatusCode::StorageUnavailable,
324        _ => StatusCode::Internal,
325    }
326}
327
328impl ErrorExt for Error {
329    fn as_any(&self) -> &dyn Any {
330        self
331    }
332
333    fn status_code(&self) -> StatusCode {
334        use Error::*;
335
336        match self {
337            TlsConfig { .. }
338            | InvalidProvider { .. }
339            | IllegalNamespace { .. }
340            | MissingKey { .. }
341            | MissingValue { .. }
342            | OverrideCompactedEntry { .. } => StatusCode::InvalidArguments,
343            StartWalTask { .. }
344            | StopWalTask { .. }
345            | IllegalState { .. }
346            | ResolveKafkaEndpoint { .. }
347            | NoMaxValue { .. }
348            | Cast { .. }
349            | EncodeJson { .. }
350            | DecodeJson { .. }
351            | IllegalSequence { .. }
352            | DiscontinuousLogIndex { .. }
353            | OrderedBatchProducerStopped { .. }
354            | WaitProduceResultReceiver { .. }
355            | WaitDumpIndex { .. }
356            | MetaLengthExceededLimit { .. } => StatusCode::Internal,
357
358            // Object store related errors
359            CreateWriter { .. } | WriteIndex { .. } | ReadIndex { .. } | Io { .. } => {
360                StatusCode::StorageUnavailable
361            }
362            // Raft engine
363            FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => {
364                StatusCode::StorageUnavailable
365            }
366            // Kafka producer related errors
367            ProduceRecord { error, .. } => match error {
368                rskafka::client::producer::Error::Client(error) => {
369                    rskafka_client_error_to_status_code(error)
370                }
371                rskafka::client::producer::Error::Aggregator(_)
372                | rskafka::client::producer::Error::FlushError(_)
373                | rskafka::client::producer::Error::TooLarge => StatusCode::Internal,
374            },
375            BuildClient { error, .. }
376            | BuildPartitionClient { error, .. }
377            | BatchProduce { error, .. }
378            | GetOffset { error, .. }
379            | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
380        }
381    }
382}