Skip to main content

meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt, RetryHint};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use common_wal::kafka::rskafka_client_error_to_retry_hint;
23use snafu::{Location, Snafu};
24use store_api::storage::RegionId;
25use table::metadata::TableId;
26use tokio::sync::mpsc::error::SendError;
27use tonic::codegen::http;
28use uuid::Uuid;
29
30use crate::metasrv::SelectTarget;
31use crate::pubsub::Message;
32use crate::service::mailbox::Channel;
33
34#[derive(Snafu)]
35#[snafu(visibility(pub))]
36#[stack_trace_debug]
37pub enum Error {
38    #[snafu(display("Failed to choose items"))]
39    ChooseItems {
40        #[snafu(implicit)]
41        location: Location,
42        #[snafu(source)]
43        error: rand::distr::weighted::Error,
44    },
45
46    #[snafu(display("Exceeded deadline, operation: {}", operation))]
47    ExceededDeadline {
48        #[snafu(implicit)]
49        location: Location,
50        operation: String,
51    },
52
53    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
54    PeerUnavailable {
55        #[snafu(implicit)]
56        location: Location,
57        peer_id: u64,
58    },
59
60    #[snafu(display("Failed to list active frontends"))]
61    ListActiveFrontends {
62        #[snafu(implicit)]
63        location: Location,
64        source: common_meta::error::Error,
65    },
66
67    #[snafu(display("Failed to list active datanodes"))]
68    ListActiveDatanodes {
69        #[snafu(implicit)]
70        location: Location,
71        source: common_meta::error::Error,
72    },
73
74    #[snafu(display("Failed to list active flownodes"))]
75    ListActiveFlownodes {
76        #[snafu(implicit)]
77        location: Location,
78        source: common_meta::error::Error,
79    },
80
81    #[snafu(display("No available frontend"))]
82    NoAvailableFrontend {
83        #[snafu(implicit)]
84        location: Location,
85    },
86
87    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
88    MigrationRunning {
89        #[snafu(implicit)]
90        location: Location,
91        region_id: RegionId,
92    },
93
94    #[snafu(display(
95        "The region migration procedure is completed for region: {}, target_peer: {}",
96        region_id,
97        target_peer_id
98    ))]
99    RegionMigrated {
100        #[snafu(implicit)]
101        location: Location,
102        region_id: RegionId,
103        target_peer_id: u64,
104    },
105
106    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
107    MigrationAbort {
108        #[snafu(implicit)]
109        location: Location,
110        reason: String,
111    },
112
113    #[snafu(display(
114        "Another procedure is operating the region: {} on peer: {}",
115        region_id,
116        peer_id
117    ))]
118    RegionOperatingRace {
119        #[snafu(implicit)]
120        location: Location,
121        peer_id: DatanodeId,
122        region_id: RegionId,
123    },
124
125    #[snafu(display("Failed to init ddl manager"))]
126    InitDdlManager {
127        #[snafu(implicit)]
128        location: Location,
129        source: common_meta::error::Error,
130    },
131
132    #[snafu(display("Failed to init reconciliation manager"))]
133    InitReconciliationManager {
134        #[snafu(implicit)]
135        location: Location,
136        source: common_meta::error::Error,
137    },
138
139    #[snafu(display("Failed to create default catalog and schema"))]
140    InitMetadata {
141        #[snafu(implicit)]
142        location: Location,
143        source: common_meta::error::Error,
144    },
145
146    #[snafu(display("Failed to allocate next sequence number"))]
147    NextSequence {
148        #[snafu(implicit)]
149        location: Location,
150        source: common_meta::error::Error,
151    },
152
153    #[snafu(display("Failed to set next sequence number"))]
154    SetNextSequence {
155        #[snafu(implicit)]
156        location: Location,
157        source: common_meta::error::Error,
158    },
159
160    #[snafu(display("Failed to peek sequence number"))]
161    PeekSequence {
162        #[snafu(implicit)]
163        location: Location,
164        source: common_meta::error::Error,
165    },
166
167    #[snafu(display("Failed to start telemetry task"))]
168    StartTelemetryTask {
169        #[snafu(implicit)]
170        location: Location,
171        source: common_runtime::error::Error,
172    },
173
174    #[snafu(display("Failed to submit ddl task"))]
175    SubmitDdlTask {
176        #[snafu(implicit)]
177        location: Location,
178        source: common_meta::error::Error,
179    },
180
181    #[snafu(display("Failed to submit reconcile procedure"))]
182    SubmitReconcileProcedure {
183        #[snafu(implicit)]
184        location: Location,
185        source: common_meta::error::Error,
186    },
187
188    #[snafu(display("Failed to invalidate table cache"))]
189    InvalidateTableCache {
190        #[snafu(implicit)]
191        location: Location,
192        source: common_meta::error::Error,
193    },
194
195    #[snafu(display("Failed to list catalogs"))]
196    ListCatalogs {
197        #[snafu(implicit)]
198        location: Location,
199        source: BoxedError,
200    },
201
202    #[snafu(display("Failed to list {}'s schemas", catalog))]
203    ListSchemas {
204        #[snafu(implicit)]
205        location: Location,
206        catalog: String,
207        source: BoxedError,
208    },
209
210    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
211    ListTables {
212        #[snafu(implicit)]
213        location: Location,
214        catalog: String,
215        schema: String,
216        source: BoxedError,
217    },
218
219    #[snafu(display("Failed to join a future"))]
220    Join {
221        #[snafu(implicit)]
222        location: Location,
223        #[snafu(source)]
224        error: JoinError,
225    },
226
227    #[snafu(display(
228        "Failed to request {}, required: {}, but only {} available",
229        select_target,
230        required,
231        available
232    ))]
233    NoEnoughAvailableNode {
234        #[snafu(implicit)]
235        location: Location,
236        required: usize,
237        available: usize,
238        select_target: SelectTarget,
239    },
240
241    #[snafu(display("Failed to send shutdown signal"))]
242    SendShutdownSignal {
243        #[snafu(source)]
244        error: SendError<()>,
245    },
246
247    #[snafu(display("Failed to shutdown {} server", server))]
248    ShutdownServer {
249        #[snafu(implicit)]
250        location: Location,
251        source: servers::error::Error,
252        server: String,
253    },
254
255    #[snafu(display("Empty key is not allowed"))]
256    EmptyKey {
257        #[snafu(implicit)]
258        location: Location,
259    },
260
261    #[snafu(display("Failed to execute via Etcd"))]
262    EtcdFailed {
263        #[snafu(source)]
264        error: etcd_client::Error,
265        #[snafu(implicit)]
266        location: Location,
267    },
268
269    #[snafu(display("Failed to connect to Etcd"))]
270    ConnectEtcd {
271        #[snafu(source)]
272        error: etcd_client::Error,
273        #[snafu(implicit)]
274        location: Location,
275    },
276
277    #[snafu(display("Failed to read file: {}", path))]
278    FileIo {
279        #[snafu(source)]
280        error: std::io::Error,
281        #[snafu(implicit)]
282        location: Location,
283        path: String,
284    },
285
286    #[snafu(display("Failed to bind address {}", addr))]
287    TcpBind {
288        addr: String,
289        #[snafu(source)]
290        error: std::io::Error,
291        #[snafu(implicit)]
292        location: Location,
293    },
294
295    #[snafu(display("Failed to start gRPC server"))]
296    StartGrpc {
297        #[snafu(source)]
298        error: tonic::transport::Error,
299        #[snafu(implicit)]
300        location: Location,
301    },
302
303    #[snafu(display("Failed to start http server"))]
304    StartHttp {
305        #[snafu(implicit)]
306        location: Location,
307        source: servers::error::Error,
308    },
309
310    #[snafu(display("Failed to parse address {}", addr))]
311    ParseAddr {
312        addr: String,
313        #[snafu(source)]
314        error: std::net::AddrParseError,
315    },
316
317    #[snafu(display("Invalid lease key: {}", key))]
318    InvalidLeaseKey {
319        key: String,
320        #[snafu(implicit)]
321        location: Location,
322    },
323
324    #[snafu(display("Invalid datanode stat key: {}", key))]
325    InvalidStatKey {
326        key: String,
327        #[snafu(implicit)]
328        location: Location,
329    },
330
331    #[snafu(display("Invalid inactive region key: {}", key))]
332    InvalidInactiveRegionKey {
333        key: String,
334        #[snafu(implicit)]
335        location: Location,
336    },
337
338    #[snafu(display("Failed to parse lease key from utf8"))]
339    LeaseKeyFromUtf8 {
340        #[snafu(source)]
341        error: std::string::FromUtf8Error,
342        #[snafu(implicit)]
343        location: Location,
344    },
345
346    #[snafu(display("Failed to parse lease value from utf8"))]
347    LeaseValueFromUtf8 {
348        #[snafu(source)]
349        error: std::string::FromUtf8Error,
350        #[snafu(implicit)]
351        location: Location,
352    },
353
354    #[snafu(display("Failed to parse invalid region key from utf8"))]
355    InvalidRegionKeyFromUtf8 {
356        #[snafu(source)]
357        error: std::string::FromUtf8Error,
358        #[snafu(implicit)]
359        location: Location,
360    },
361
362    #[snafu(display("Failed to serialize to json: {}", input))]
363    SerializeToJson {
364        input: String,
365        #[snafu(source)]
366        error: serde_json::error::Error,
367        #[snafu(implicit)]
368        location: Location,
369    },
370
371    #[snafu(display("Failed to deserialize from json: {}", input))]
372    DeserializeFromJson {
373        input: String,
374        #[snafu(source)]
375        error: serde_json::error::Error,
376        #[snafu(implicit)]
377        location: Location,
378    },
379
380    #[snafu(display("Failed to serialize config"))]
381    SerializeConfig {
382        #[snafu(source)]
383        error: serde_json::error::Error,
384        #[snafu(implicit)]
385        location: Location,
386    },
387
388    #[snafu(display("Failed to parse number: {}", err_msg))]
389    ParseNum {
390        err_msg: String,
391        #[snafu(source)]
392        error: std::num::ParseIntError,
393        #[snafu(implicit)]
394        location: Location,
395    },
396
397    #[snafu(display("Failed to parse bool: {}", err_msg))]
398    ParseBool {
399        err_msg: String,
400        #[snafu(source)]
401        error: std::str::ParseBoolError,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
407    DowngradeLeader {
408        region_id: RegionId,
409        #[snafu(implicit)]
410        location: Location,
411        #[snafu(source)]
412        source: BoxedError,
413    },
414
415    #[snafu(display("Region's leader peer changed: {}", msg))]
416    LeaderPeerChanged {
417        msg: String,
418        #[snafu(implicit)]
419        location: Location,
420    },
421
422    #[snafu(display("Invalid arguments: {}", err_msg))]
423    InvalidArguments {
424        err_msg: String,
425        #[snafu(implicit)]
426        location: Location,
427    },
428
429    #[cfg(feature = "mysql_kvbackend")]
430    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
431    ParseMySqlUrl {
432        #[snafu(source)]
433        error: sqlx::error::Error,
434        mysql_url: String,
435        #[snafu(implicit)]
436        location: Location,
437    },
438
439    #[cfg(feature = "mysql_kvbackend")]
440    #[snafu(display("Failed to decode sql value"))]
441    DecodeSqlValue {
442        #[snafu(source)]
443        error: sqlx::error::Error,
444        #[snafu(implicit)]
445        location: Location,
446    },
447
448    #[snafu(display("Failed to find table route for {table_id}"))]
449    TableRouteNotFound {
450        table_id: TableId,
451        #[snafu(implicit)]
452        location: Location,
453    },
454
455    #[snafu(display("Failed to find table route for {region_id}"))]
456    RegionRouteNotFound {
457        region_id: RegionId,
458        #[snafu(implicit)]
459        location: Location,
460    },
461
462    #[snafu(display("Table info not found: {}", table_id))]
463    TableInfoNotFound {
464        table_id: TableId,
465        #[snafu(implicit)]
466        location: Location,
467    },
468
469    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
470    DatanodeTableNotFound {
471        table_id: TableId,
472        datanode_id: DatanodeId,
473        #[snafu(implicit)]
474        location: Location,
475    },
476
477    #[snafu(display("Metasrv has no leader at this moment"))]
478    NoLeader {
479        #[snafu(implicit)]
480        location: Location,
481    },
482
483    #[snafu(display("Leader lease expired"))]
484    LeaderLeaseExpired {
485        #[snafu(implicit)]
486        location: Location,
487    },
488
489    #[snafu(display("Leader lease changed during election"))]
490    LeaderLeaseChanged {
491        #[snafu(implicit)]
492        location: Location,
493    },
494
495    #[snafu(display("Table {} not found", name))]
496    TableNotFound {
497        name: String,
498        #[snafu(implicit)]
499        location: Location,
500    },
501
502    #[snafu(display("Unsupported selector type, {}", selector_type))]
503    UnsupportedSelectorType {
504        selector_type: String,
505        #[snafu(implicit)]
506        location: Location,
507    },
508
509    #[snafu(display("Unexpected, violated: {violated}"))]
510    Unexpected {
511        violated: String,
512        #[snafu(implicit)]
513        location: Location,
514    },
515
516    #[snafu(display("Failed to create gRPC channel"))]
517    CreateChannel {
518        #[snafu(implicit)]
519        location: Location,
520        source: common_grpc::error::Error,
521    },
522
523    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
524    BatchGet {
525        #[snafu(source)]
526        error: tonic::Status,
527        #[snafu(implicit)]
528        location: Location,
529    },
530
531    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
532    Range {
533        #[snafu(source)]
534        error: tonic::Status,
535        #[snafu(implicit)]
536        location: Location,
537    },
538
539    #[snafu(display("Response header not found"))]
540    ResponseHeaderNotFound {
541        #[snafu(implicit)]
542        location: Location,
543    },
544
545    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
546    IsNotLeader {
547        node_addr: String,
548        #[snafu(implicit)]
549        location: Location,
550    },
551
552    #[snafu(display("Invalid http body"))]
553    InvalidHttpBody {
554        #[snafu(source)]
555        error: http::Error,
556        #[snafu(implicit)]
557        location: Location,
558    },
559
560    #[snafu(display(
561        "The number of retries for the grpc call {} exceeded the limit, {}",
562        func_name,
563        retry_num
564    ))]
565    ExceededRetryLimit {
566        func_name: String,
567        retry_num: usize,
568        #[snafu(implicit)]
569        location: Location,
570    },
571
572    #[snafu(display("Invalid utf-8 value"))]
573    InvalidUtf8Value {
574        #[snafu(source)]
575        error: std::string::FromUtf8Error,
576        #[snafu(implicit)]
577        location: Location,
578    },
579
580    #[snafu(display("Missing required parameter, param: {:?}", param))]
581    MissingRequiredParameter { param: String },
582
583    #[snafu(display("Failed to start procedure manager"))]
584    StartProcedureManager {
585        #[snafu(implicit)]
586        location: Location,
587        source: common_procedure::Error,
588    },
589
590    #[snafu(display("Failed to stop procedure manager"))]
591    StopProcedureManager {
592        #[snafu(implicit)]
593        location: Location,
594        source: common_procedure::Error,
595    },
596
597    #[snafu(display("Failed to wait procedure done"))]
598    WaitProcedure {
599        #[snafu(implicit)]
600        location: Location,
601        source: common_procedure::Error,
602    },
603
604    #[snafu(display("Failed to query procedure state"))]
605    QueryProcedure {
606        #[snafu(implicit)]
607        location: Location,
608        source: common_procedure::Error,
609    },
610
611    #[snafu(display("Procedure not found: {pid}"))]
612    ProcedureNotFound {
613        #[snafu(implicit)]
614        location: Location,
615        pid: String,
616    },
617
618    #[snafu(display("Failed to submit procedure"))]
619    SubmitProcedure {
620        #[snafu(implicit)]
621        location: Location,
622        source: common_procedure::Error,
623    },
624
625    #[snafu(display("A prune task for topic {} is already running", topic))]
626    PruneTaskAlreadyRunning {
627        topic: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Schema already exists, name: {schema_name}"))]
633    SchemaAlreadyExists {
634        schema_name: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Table already exists: {table_name}"))]
640    TableAlreadyExists {
641        table_name: String,
642        #[snafu(implicit)]
643        location: Location,
644    },
645
646    #[snafu(display("Pusher not found: {pusher_id}"))]
647    PusherNotFound {
648        pusher_id: String,
649        #[snafu(implicit)]
650        location: Location,
651    },
652
653    #[snafu(display("Failed to push message: {err_msg}"))]
654    PushMessage {
655        err_msg: String,
656        #[snafu(implicit)]
657        location: Location,
658    },
659
660    #[snafu(display("Mailbox already closed: {id}"))]
661    MailboxClosed {
662        id: u64,
663        #[snafu(implicit)]
664        location: Location,
665    },
666
667    #[snafu(display("Mailbox timeout: {id}"))]
668    MailboxTimeout {
669        id: u64,
670        #[snafu(implicit)]
671        location: Location,
672    },
673
674    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
675    MailboxReceiver {
676        id: u64,
677        err_msg: String,
678        #[snafu(implicit)]
679        location: Location,
680    },
681
682    #[snafu(display("Mailbox channel closed: {channel}"))]
683    MailboxChannelClosed {
684        channel: Channel,
685        #[snafu(implicit)]
686        location: Location,
687    },
688
689    #[snafu(display("Missing request header"))]
690    MissingRequestHeader {
691        #[snafu(implicit)]
692        location: Location,
693    },
694
695    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
696    RegisterProcedureLoader {
697        type_name: String,
698        #[snafu(implicit)]
699        location: Location,
700        source: common_procedure::error::Error,
701    },
702
703    #[snafu(display(
704        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
705        mailbox_message,
706        reason
707    ))]
708    UnexpectedInstructionReply {
709        mailbox_message: String,
710        reason: String,
711        #[snafu(implicit)]
712        location: Location,
713    },
714
715    #[snafu(display("Expected to retry later, reason: {}", reason))]
716    RetryLater {
717        reason: String,
718        #[snafu(implicit)]
719        location: Location,
720    },
721
722    #[snafu(display("Expected to retry later, reason: {}", reason))]
723    RetryLaterWithSource {
724        reason: String,
725        #[snafu(implicit)]
726        location: Location,
727        source: BoxedError,
728    },
729
730    #[snafu(display("Failed to convert proto data"))]
731    ConvertProtoData {
732        #[snafu(implicit)]
733        location: Location,
734        source: common_meta::error::Error,
735    },
736
737    // this error is used for custom error mapping
738    // please do not delete it
739    #[snafu(display("Other error"))]
740    Other {
741        source: BoxedError,
742        #[snafu(implicit)]
743        location: Location,
744    },
745
746    #[snafu(display("Table metadata manager error"))]
747    TableMetadataManager {
748        source: common_meta::error::Error,
749        #[snafu(implicit)]
750        location: Location,
751    },
752
753    #[snafu(display("Runtime switch manager error"))]
754    RuntimeSwitchManager {
755        source: common_meta::error::Error,
756        #[snafu(implicit)]
757        location: Location,
758    },
759
760    #[snafu(display("Keyvalue backend error"))]
761    KvBackend {
762        source: common_meta::error::Error,
763        #[snafu(implicit)]
764        location: Location,
765    },
766
767    #[snafu(display("Failed to publish message"))]
768    PublishMessage {
769        #[snafu(source)]
770        error: SendError<Message>,
771        #[snafu(implicit)]
772        location: Location,
773    },
774
775    #[snafu(display("Too many partitions"))]
776    TooManyPartitions {
777        #[snafu(implicit)]
778        location: Location,
779    },
780
781    #[snafu(display("Failed to create repartition subtasks"))]
782    RepartitionCreateSubtasks {
783        source: partition::error::Error,
784        #[snafu(implicit)]
785        location: Location,
786    },
787
788    #[snafu(display(
789        "Source partition expression '{}' does not match any existing region",
790        expr
791    ))]
792    RepartitionSourceExprMismatch {
793        expr: String,
794        #[snafu(implicit)]
795        location: Location,
796    },
797
798    #[snafu(display(
799        "Failed to get the state receiver for repartition subprocedure {}",
800        procedure_id
801    ))]
802    RepartitionSubprocedureStateReceiver {
803        procedure_id: ProcedureId,
804        #[snafu(source)]
805        source: common_procedure::Error,
806        #[snafu(implicit)]
807        location: Location,
808    },
809
810    #[snafu(display("Unsupported operation {}", operation))]
811    Unsupported {
812        operation: String,
813        #[snafu(implicit)]
814        location: Location,
815    },
816
817    #[snafu(display("Unexpected table route type: {}", err_msg))]
818    UnexpectedLogicalRouteTable {
819        #[snafu(implicit)]
820        location: Location,
821        err_msg: String,
822        source: common_meta::error::Error,
823    },
824
825    #[snafu(display("Failed to save cluster info"))]
826    SaveClusterInfo {
827        #[snafu(implicit)]
828        location: Location,
829        source: common_meta::error::Error,
830    },
831
832    #[snafu(display("Invalid cluster info format"))]
833    InvalidClusterInfoFormat {
834        #[snafu(implicit)]
835        location: Location,
836        source: common_meta::error::Error,
837    },
838
839    #[snafu(display("Invalid datanode stat format"))]
840    InvalidDatanodeStatFormat {
841        #[snafu(implicit)]
842        location: Location,
843        source: common_meta::error::Error,
844    },
845
846    #[snafu(display("Invalid node info format"))]
847    InvalidNodeInfoFormat {
848        #[snafu(implicit)]
849        location: Location,
850        source: common_meta::error::Error,
851    },
852
853    #[snafu(display("Failed to serialize options to TOML"))]
854    TomlFormat {
855        #[snafu(implicit)]
856        location: Location,
857        #[snafu(source(from(common_config::error::Error, Box::new)))]
858        source: Box<common_config::error::Error>,
859    },
860
861    #[cfg(feature = "pg_kvbackend")]
862    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
863    PostgresExecution {
864        #[snafu(source)]
865        error: tokio_postgres::Error,
866        sql: String,
867        #[snafu(implicit)]
868        location: Location,
869    },
870
871    #[cfg(feature = "pg_kvbackend")]
872    #[snafu(display("Failed to get Postgres client"))]
873    GetPostgresClient {
874        #[snafu(implicit)]
875        location: Location,
876        #[snafu(source)]
877        error: deadpool::managed::PoolError<tokio_postgres::Error>,
878    },
879
880    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
881    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
882    SqlExecutionTimeout {
883        #[snafu(implicit)]
884        location: Location,
885        sql: String,
886        duration: std::time::Duration,
887    },
888
889    #[cfg(feature = "pg_kvbackend")]
890    #[snafu(display("Failed to create connection pool for Postgres"))]
891    CreatePostgresPool {
892        #[snafu(source)]
893        error: deadpool_postgres::CreatePoolError,
894        #[snafu(implicit)]
895        location: Location,
896    },
897
898    #[cfg(feature = "pg_kvbackend")]
899    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
900    GetPostgresConnection {
901        reason: String,
902        #[snafu(implicit)]
903        location: Location,
904    },
905
906    #[cfg(feature = "mysql_kvbackend")]
907    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
908    MySqlExecution {
909        #[snafu(source)]
910        error: sqlx::Error,
911        #[snafu(implicit)]
912        location: Location,
913        sql: String,
914    },
915
916    #[cfg(feature = "mysql_kvbackend")]
917    #[snafu(display("Failed to create mysql pool"))]
918    CreateMySqlPool {
919        #[snafu(source)]
920        error: sqlx::Error,
921        #[snafu(implicit)]
922        location: Location,
923    },
924
925    #[cfg(feature = "mysql_kvbackend")]
926    #[snafu(display("Failed to acquire mysql client from pool"))]
927    AcquireMySqlClient {
928        #[snafu(source)]
929        error: sqlx::Error,
930        #[snafu(implicit)]
931        location: Location,
932    },
933
934    #[snafu(display("Handler not found: {}", name))]
935    HandlerNotFound {
936        name: String,
937        #[snafu(implicit)]
938        location: Location,
939    },
940
941    #[snafu(display("Flow state handler error"))]
942    FlowStateHandler {
943        #[snafu(implicit)]
944        location: Location,
945        source: common_meta::error::Error,
946    },
947
948    #[snafu(display("Failed to build wal provider"))]
949    BuildWalProvider {
950        #[snafu(implicit)]
951        location: Location,
952        source: common_meta::error::Error,
953    },
954
955    #[snafu(display("Failed to build kafka client."))]
956    BuildKafkaClient {
957        #[snafu(implicit)]
958        location: Location,
959        #[snafu(source)]
960        error: common_meta::error::Error,
961    },
962
963    #[snafu(display(
964        "Failed to build a Kafka partition client, topic: {}, partition: {}",
965        topic,
966        partition
967    ))]
968    BuildPartitionClient {
969        topic: String,
970        partition: i32,
971        #[snafu(implicit)]
972        location: Location,
973        #[snafu(source)]
974        error: rskafka::client::error::Error,
975    },
976
977    #[snafu(display(
978        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
979        topic,
980        partition,
981        offset
982    ))]
983    DeleteRecords {
984        #[snafu(implicit)]
985        location: Location,
986        #[snafu(source)]
987        error: rskafka::client::error::Error,
988        topic: String,
989        partition: i32,
990        offset: u64,
991    },
992
993    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
994    GetOffset {
995        topic: String,
996        #[snafu(implicit)]
997        location: Location,
998        #[snafu(source)]
999        error: rskafka::client::error::Error,
1000    },
1001
1002    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1003    UpdateTopicNameValue {
1004        topic: String,
1005        #[snafu(implicit)]
1006        location: Location,
1007        #[snafu(source)]
1008        source: common_meta::error::Error,
1009    },
1010
1011    #[snafu(display("Failed to build tls options"))]
1012    BuildTlsOptions {
1013        #[snafu(implicit)]
1014        location: Location,
1015        #[snafu(source)]
1016        source: common_meta::error::Error,
1017    },
1018
1019    #[snafu(display(
1020        "Repartition group {} source region missing, region id: {}",
1021        group_id,
1022        region_id
1023    ))]
1024    RepartitionSourceRegionMissing {
1025        group_id: Uuid,
1026        region_id: RegionId,
1027        #[snafu(implicit)]
1028        location: Location,
1029    },
1030
1031    #[snafu(display(
1032        "Repartition group {} target region missing, region id: {}",
1033        group_id,
1034        region_id
1035    ))]
1036    RepartitionTargetRegionMissing {
1037        group_id: Uuid,
1038        region_id: RegionId,
1039        #[snafu(implicit)]
1040        location: Location,
1041    },
1042
1043    #[snafu(display("Failed to serialize partition expression"))]
1044    SerializePartitionExpr {
1045        #[snafu(source)]
1046        source: partition::error::Error,
1047        #[snafu(implicit)]
1048        location: Location,
1049    },
1050
1051    #[snafu(display("Failed to deserialize partition expression"))]
1052    DeserializePartitionExpr {
1053        #[snafu(source)]
1054        source: partition::error::Error,
1055        #[snafu(implicit)]
1056        location: Location,
1057    },
1058
1059    #[snafu(display("Empty partition expression"))]
1060    EmptyPartitionExpr {
1061        #[snafu(implicit)]
1062        location: Location,
1063    },
1064
1065    #[snafu(display(
1066        "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1067        region_id,
1068        expected,
1069        actual
1070    ))]
1071    PartitionExprMismatch {
1072        region_id: RegionId,
1073        expected: String,
1074        actual: String,
1075        #[snafu(implicit)]
1076        location: Location,
1077    },
1078
1079    #[snafu(display("Failed to allocate regions for table: {}", table_id))]
1080    AllocateRegions {
1081        #[snafu(implicit)]
1082        location: Location,
1083        table_id: TableId,
1084        #[snafu(source)]
1085        source: common_meta::error::Error,
1086    },
1087
1088    #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1089    DeallocateRegions {
1090        #[snafu(implicit)]
1091        location: Location,
1092        table_id: TableId,
1093        #[snafu(source)]
1094        source: common_meta::error::Error,
1095    },
1096
1097    #[snafu(display("Failed to build create request for table: {}", table_id))]
1098    BuildCreateRequest {
1099        #[snafu(implicit)]
1100        location: Location,
1101        table_id: TableId,
1102        #[snafu(source)]
1103        source: common_meta::error::Error,
1104    },
1105
1106    #[snafu(display("Failed to allocate region routes for table: {}", table_id))]
1107    AllocateRegionRoutes {
1108        #[snafu(implicit)]
1109        location: Location,
1110        table_id: TableId,
1111        #[snafu(source)]
1112        source: common_meta::error::Error,
1113    },
1114
1115    #[snafu(display("Failed to allocate wal options for table: {}", table_id))]
1116    AllocateWalOptions {
1117        #[snafu(implicit)]
1118        location: Location,
1119        table_id: TableId,
1120        #[snafu(source)]
1121        source: common_meta::error::Error,
1122    },
1123}
1124
1125impl Error {
1126    /// Returns `true` if the error is retryable.
1127    pub fn is_retryable(&self) -> bool {
1128        self.retry_hint().is_retryable()
1129    }
1130}
1131
1132pub type Result<T> = std::result::Result<T, Error>;
1133
1134define_into_tonic_status!(Error);
1135
1136impl ErrorExt for Error {
1137    fn status_code(&self) -> StatusCode {
1138        match self {
1139            Error::EtcdFailed { .. }
1140            | Error::ConnectEtcd { .. }
1141            | Error::FileIo { .. }
1142            | Error::TcpBind { .. }
1143            | Error::SerializeConfig { .. }
1144            | Error::SerializeToJson { .. }
1145            | Error::DeserializeFromJson { .. }
1146            | Error::NoLeader { .. }
1147            | Error::LeaderLeaseExpired { .. }
1148            | Error::LeaderLeaseChanged { .. }
1149            | Error::CreateChannel { .. }
1150            | Error::BatchGet { .. }
1151            | Error::Range { .. }
1152            | Error::ResponseHeaderNotFound { .. }
1153            | Error::InvalidHttpBody { .. }
1154            | Error::ExceededRetryLimit { .. }
1155            | Error::SendShutdownSignal { .. }
1156            | Error::PushMessage { .. }
1157            | Error::MailboxClosed { .. }
1158            | Error::MailboxReceiver { .. }
1159            | Error::StartGrpc { .. }
1160            | Error::PublishMessage { .. }
1161            | Error::Join { .. }
1162            | Error::ChooseItems { .. }
1163            | Error::FlowStateHandler { .. }
1164            | Error::BuildWalProvider { .. }
1165            | Error::BuildPartitionClient { .. }
1166            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1167
1168            Error::DeleteRecords { .. }
1169            | Error::GetOffset { .. }
1170            | Error::PeerUnavailable { .. }
1171            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1172            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1173            Error::PruneTaskAlreadyRunning { .. }
1174            | Error::RetryLater { .. }
1175            | Error::MailboxChannelClosed { .. }
1176            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1177            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1178            Error::SerializePartitionExpr { source, .. }
1179            | Error::DeserializePartitionExpr { source, .. } => source.status_code(),
1180
1181            Error::Unsupported { .. } => StatusCode::Unsupported,
1182
1183            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1184
1185            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1186            Error::EmptyKey { .. }
1187            | Error::MissingRequiredParameter { .. }
1188            | Error::MissingRequestHeader { .. }
1189            | Error::InvalidLeaseKey { .. }
1190            | Error::InvalidStatKey { .. }
1191            | Error::InvalidInactiveRegionKey { .. }
1192            | Error::ParseNum { .. }
1193            | Error::ParseBool { .. }
1194            | Error::ParseAddr { .. }
1195            | Error::UnsupportedSelectorType { .. }
1196            | Error::InvalidArguments { .. }
1197            | Error::ProcedureNotFound { .. }
1198            | Error::TooManyPartitions { .. }
1199            | Error::TomlFormat { .. }
1200            | Error::HandlerNotFound { .. }
1201            | Error::LeaderPeerChanged { .. }
1202            | Error::RepartitionSourceRegionMissing { .. }
1203            | Error::RepartitionTargetRegionMissing { .. }
1204            | Error::PartitionExprMismatch { .. }
1205            | Error::RepartitionSourceExprMismatch { .. }
1206            | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
1207            Error::LeaseKeyFromUtf8 { .. }
1208            | Error::LeaseValueFromUtf8 { .. }
1209            | Error::InvalidRegionKeyFromUtf8 { .. }
1210            | Error::TableRouteNotFound { .. }
1211            | Error::TableInfoNotFound { .. }
1212            | Error::DatanodeTableNotFound { .. }
1213            | Error::InvalidUtf8Value { .. }
1214            | Error::UnexpectedInstructionReply { .. }
1215            | Error::Unexpected { .. }
1216            | Error::RegionOperatingRace { .. }
1217            | Error::RegionRouteNotFound { .. }
1218            | Error::MigrationAbort { .. }
1219            | Error::MigrationRunning { .. }
1220            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1221            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1222            Error::SaveClusterInfo { source, .. }
1223            | Error::InvalidClusterInfoFormat { source, .. }
1224            | Error::InvalidDatanodeStatFormat { source, .. }
1225            | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1226            Error::InvalidateTableCache { source, .. } => source.status_code(),
1227            Error::SubmitProcedure { source, .. }
1228            | Error::WaitProcedure { source, .. }
1229            | Error::QueryProcedure { source, .. } => source.status_code(),
1230            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1231                source.status_code()
1232            }
1233            Error::StartProcedureManager { source, .. }
1234            | Error::StopProcedureManager { source, .. } => source.status_code(),
1235
1236            Error::ListCatalogs { source, .. }
1237            | Error::ListSchemas { source, .. }
1238            | Error::ListTables { source, .. } => source.status_code(),
1239            Error::StartTelemetryTask { source, .. } => source.status_code(),
1240
1241            Error::NextSequence { source, .. }
1242            | Error::SetNextSequence { source, .. }
1243            | Error::PeekSequence { source, .. } => source.status_code(),
1244            Error::DowngradeLeader { source, .. } => source.status_code(),
1245            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1246            Error::SubmitDdlTask { source, .. }
1247            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1248            Error::ConvertProtoData { source, .. }
1249            | Error::TableMetadataManager { source, .. }
1250            | Error::RuntimeSwitchManager { source, .. }
1251            | Error::KvBackend { source, .. }
1252            | Error::UnexpectedLogicalRouteTable { source, .. }
1253            | Error::UpdateTopicNameValue { source, .. } => source.status_code(),
1254            Error::ListActiveFrontends { source, .. }
1255            | Error::ListActiveDatanodes { source, .. }
1256            | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1257            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1258
1259            Error::InitMetadata { source, .. }
1260            | Error::InitDdlManager { source, .. }
1261            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1262
1263            Error::BuildTlsOptions { source, .. } => source.status_code(),
1264            Error::Other { source, .. } => source.status_code(),
1265            Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1266            Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1267            Error::AllocateRegions { source, .. } => source.status_code(),
1268            Error::DeallocateRegions { source, .. } => source.status_code(),
1269            Error::AllocateRegionRoutes { source, .. } => source.status_code(),
1270            Error::AllocateWalOptions { source, .. } => source.status_code(),
1271            Error::BuildCreateRequest { source, .. } => source.status_code(),
1272            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1273
1274            #[cfg(feature = "pg_kvbackend")]
1275            Error::CreatePostgresPool { .. }
1276            | Error::GetPostgresClient { .. }
1277            | Error::GetPostgresConnection { .. }
1278            | Error::PostgresExecution { .. } => StatusCode::Internal,
1279            #[cfg(feature = "mysql_kvbackend")]
1280            Error::MySqlExecution { .. }
1281            | Error::CreateMySqlPool { .. }
1282            | Error::ParseMySqlUrl { .. }
1283            | Error::DecodeSqlValue { .. }
1284            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1285            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1286            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1287        }
1288    }
1289
1290    fn as_any(&self) -> &dyn std::any::Any {
1291        self
1292    }
1293
1294    fn retry_hint(&self) -> RetryHint {
1295        match self {
1296            Error::RetryLater { .. }
1297            | Error::RetryLaterWithSource { .. }
1298            | Error::MailboxTimeout { .. }
1299            | Error::NoEnoughAvailableNode { .. }
1300            | Error::NoLeader { .. }
1301            | Error::LeaderLeaseExpired { .. }
1302            | Error::LeaderLeaseChanged { .. }
1303            | Error::PeerUnavailable { .. } => RetryHint::Retryable,
1304
1305            Error::ConnectEtcd { error, .. } | Error::EtcdFailed { error, .. } => {
1306                common_meta::error::retry_hint_from_etcd_error(error)
1307            }
1308
1309            #[cfg(feature = "pg_kvbackend")]
1310            Error::PostgresExecution { error, .. } => {
1311                common_meta::error::retry_hint_from_postgres_error(error)
1312            }
1313            #[cfg(feature = "pg_kvbackend")]
1314            Error::GetPostgresClient { error, .. } => {
1315                common_meta::error::retry_hint_from_postgres_pool_error(error)
1316            }
1317            #[cfg(feature = "mysql_kvbackend")]
1318            Error::MySqlExecution { error, .. }
1319            | Error::CreateMySqlPool { error, .. }
1320            | Error::AcquireMySqlClient { error, .. } => {
1321                common_meta::error::retry_hint_from_sqlx_error(error)
1322            }
1323            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1324            Error::SqlExecutionTimeout { .. } => RetryHint::Retryable,
1325
1326            Error::ListActiveFrontends { source, .. }
1327            | Error::ListActiveDatanodes { source, .. }
1328            | Error::ListActiveFlownodes { source, .. }
1329            | Error::InitDdlManager { source, .. }
1330            | Error::InitReconciliationManager { source, .. }
1331            | Error::InitMetadata { source, .. }
1332            | Error::NextSequence { source, .. }
1333            | Error::SetNextSequence { source, .. }
1334            | Error::PeekSequence { source, .. }
1335            | Error::SubmitDdlTask { source, .. }
1336            | Error::SubmitReconcileProcedure { source, .. }
1337            | Error::InvalidateTableCache { source, .. }
1338            | Error::ConvertProtoData { source, .. }
1339            | Error::TableMetadataManager { source, .. }
1340            | Error::RuntimeSwitchManager { source, .. }
1341            | Error::KvBackend { source, .. }
1342            | Error::UnexpectedLogicalRouteTable { source, .. }
1343            | Error::SaveClusterInfo { source, .. }
1344            | Error::InvalidClusterInfoFormat { source, .. }
1345            | Error::InvalidDatanodeStatFormat { source, .. }
1346            | Error::InvalidNodeInfoFormat { source, .. }
1347            | Error::FlowStateHandler { source, .. }
1348            | Error::BuildWalProvider { source, .. }
1349            | Error::BuildKafkaClient { error: source, .. }
1350            | Error::UpdateTopicNameValue { source, .. }
1351            | Error::BuildTlsOptions { source, .. }
1352            | Error::AllocateRegions { source, .. }
1353            | Error::DeallocateRegions { source, .. }
1354            | Error::BuildCreateRequest { source, .. }
1355            | Error::AllocateRegionRoutes { source, .. }
1356            | Error::AllocateWalOptions { source, .. } => source.retry_hint(),
1357
1358            Error::Other { source, .. }
1359            | Error::ListCatalogs { source, .. }
1360            | Error::ListSchemas { source, .. }
1361            | Error::ListTables { source, .. }
1362            | Error::DowngradeLeader { source, .. } => source.retry_hint(),
1363
1364            Error::SubmitProcedure { source, .. }
1365            | Error::WaitProcedure { source, .. }
1366            | Error::QueryProcedure { source, .. }
1367            | Error::StartProcedureManager { source, .. }
1368            | Error::StopProcedureManager { source, .. }
1369            | Error::RegisterProcedureLoader { source, .. }
1370            | Error::RepartitionSubprocedureStateReceiver { source, .. } => source.retry_hint(),
1371
1372            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1373                source.retry_hint()
1374            }
1375            Error::StartTelemetryTask { source, .. } => source.retry_hint(),
1376            Error::CreateChannel { source, .. } => source.retry_hint(),
1377            Error::RepartitionCreateSubtasks { source, .. } => source.retry_hint(),
1378            Error::SerializePartitionExpr { source, .. }
1379            | Error::DeserializePartitionExpr { source, .. } => source.retry_hint(),
1380
1381            Error::DeleteRecords { error, .. }
1382            | Error::BuildPartitionClient { error, .. }
1383            | Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error),
1384
1385            Error::PusherNotFound { .. }
1386            | Error::PushMessage { .. }
1387            | Error::ExceededDeadline { .. } => RetryHint::NonRetryable,
1388
1389            _ => RetryHint::NonRetryable,
1390        }
1391    }
1392}
1393
1394// for form tonic
1395pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1396    let mut err: &(dyn std::error::Error + 'static) = err_status;
1397
1398    loop {
1399        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1400            return Some(io_err);
1401        }
1402
1403        // h2::Error do not expose std::io::Error with `source()`
1404        // https://github.com/hyperium/h2/pull/462
1405        if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1406            && let Some(io_err) = h2_err.get_io()
1407        {
1408            return Some(io_err);
1409        }
1410
1411        err = err.source()?;
1412    }
1413}
1414
1415#[cfg(test)]
1416mod tests {
1417    use std::time::Duration;
1418
1419    use common_error::ext::ErrorExt;
1420    use common_error::mock::MockError;
1421    use common_error::status_code::StatusCode;
1422    use rskafka::BackoffError;
1423    use rskafka::client::error::Error as KafkaClientError;
1424    use snafu::ResultExt;
1425
1426    use super::{
1427        BuildPartitionClientSnafu, DeallocateRegionsSnafu, DeleteRecordsSnafu, GetOffsetSnafu,
1428    };
1429
1430    fn retry_failed_kafka_error() -> KafkaClientError {
1431        KafkaClientError::RetryFailed(BackoffError::DeadlineExceded {
1432            deadline: Duration::from_secs(1),
1433            source: Box::new(std::io::Error::other("retry failed")),
1434        })
1435    }
1436
1437    #[test]
1438    fn test_deallocate_regions_is_retryable_when_source_is_retry_later() {
1439        let source = common_meta::error::Error::retry_later(MockError::new(StatusCode::Internal));
1440        let err = Err::<(), _>(source)
1441            .context(DeallocateRegionsSnafu { table_id: 1024_u32 })
1442            .unwrap_err();
1443
1444        assert!(err.is_retryable());
1445        assert!(err.retry_hint().is_retryable());
1446    }
1447
1448    #[test]
1449    fn test_deallocate_regions_is_not_retryable_when_source_is_not_retry_later() {
1450        let source = common_meta::error::UnexpectedSnafu {
1451            err_msg: "mock error",
1452        }
1453        .build();
1454        let err = Err::<(), _>(source)
1455            .context(DeallocateRegionsSnafu { table_id: 1024_u32 })
1456            .unwrap_err();
1457
1458        assert!(!err.is_retryable());
1459        assert!(!err.retry_hint().is_retryable());
1460    }
1461
1462    #[test]
1463    fn test_kafka_retry_failed_errors_are_retryable() {
1464        let delete_records_err = Err::<(), _>(retry_failed_kafka_error())
1465            .context(DeleteRecordsSnafu {
1466                topic: "test_topic",
1467                partition: 0,
1468                offset: 1024u64,
1469            })
1470            .unwrap_err();
1471        let build_partition_client_err = Err::<(), _>(retry_failed_kafka_error())
1472            .context(BuildPartitionClientSnafu {
1473                topic: "test_topic",
1474                partition: 0,
1475            })
1476            .unwrap_err();
1477        let get_offset_err = Err::<(), _>(retry_failed_kafka_error())
1478            .context(GetOffsetSnafu {
1479                topic: "test_topic",
1480            })
1481            .unwrap_err();
1482
1483        assert!(delete_records_err.is_retryable());
1484        assert!(build_partition_client_err.is_retryable());
1485        assert!(get_offset_err.is_retryable());
1486        assert!(delete_records_err.retry_hint().is_retryable());
1487        assert!(build_partition_client_err.retry_hint().is_retryable());
1488        assert!(get_offset_err.retry_hint().is_retryable());
1489    }
1490
1491    #[test]
1492    fn test_kafka_non_retry_failed_errors_are_not_retryable() {
1493        let delete_records_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1494            .context(DeleteRecordsSnafu {
1495                topic: "test_topic",
1496                partition: 0,
1497                offset: 1024u64,
1498            })
1499            .unwrap_err();
1500        let build_partition_client_err =
1501            Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1502                .context(BuildPartitionClientSnafu {
1503                    topic: "test_topic",
1504                    partition: 0,
1505                })
1506                .unwrap_err();
1507        let get_offset_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1508            .context(GetOffsetSnafu {
1509                topic: "test_topic",
1510            })
1511            .unwrap_err();
1512
1513        assert!(!delete_records_err.is_retryable());
1514        assert!(!build_partition_client_err.is_retryable());
1515        assert!(!get_offset_err.is_retryable());
1516        assert!(!delete_records_err.retry_hint().is_retryable());
1517        assert!(!build_partition_client_err.retry_hint().is_retryable());
1518        assert!(!get_offset_err.retry_hint().is_retryable());
1519    }
1520}