common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_wal::options::WalOptions;
22use serde_json::error::Error as JsonError;
23use snafu::{Location, Snafu};
24use store_api::storage::RegionId;
25use table::metadata::TableId;
26
27use crate::peer::Peer;
28use crate::DatanodeId;
29
30#[derive(Snafu)]
31#[snafu(visibility(pub))]
32#[stack_trace_debug]
33pub enum Error {
34    #[snafu(display("Empty key is not allowed"))]
35    EmptyKey {
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display(
41        "Another procedure is operating the region: {} on peer: {}",
42        region_id,
43        peer_id
44    ))]
45    RegionOperatingRace {
46        #[snafu(implicit)]
47        location: Location,
48        peer_id: DatanodeId,
49        region_id: RegionId,
50    },
51
52    #[snafu(display("Failed to connect to Etcd"))]
53    ConnectEtcd {
54        #[snafu(source)]
55        error: etcd_client::Error,
56        #[snafu(implicit)]
57        location: Location,
58    },
59
60    #[snafu(display("Failed to execute via Etcd"))]
61    EtcdFailed {
62        #[snafu(source)]
63        error: etcd_client::Error,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
69    EtcdTxnFailed {
70        max_operations: usize,
71        #[snafu(source)]
72        error: etcd_client::Error,
73        #[snafu(implicit)]
74        location: Location,
75    },
76
77    #[snafu(display("Failed to get sequence: {}", err_msg))]
78    NextSequence {
79        err_msg: String,
80        #[snafu(implicit)]
81        location: Location,
82    },
83
84    #[snafu(display("Unexpected sequence value: {}", err_msg))]
85    UnexpectedSequenceValue {
86        err_msg: String,
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("Table info not found: {}", table))]
92    TableInfoNotFound {
93        table: String,
94        #[snafu(implicit)]
95        location: Location,
96    },
97
98    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
99    RegisterProcedureLoader {
100        type_name: String,
101        #[snafu(implicit)]
102        location: Location,
103        source: common_procedure::error::Error,
104    },
105
106    #[snafu(display("Failed to submit procedure"))]
107    SubmitProcedure {
108        #[snafu(implicit)]
109        location: Location,
110        source: common_procedure::Error,
111    },
112
113    #[snafu(display("Failed to query procedure"))]
114    QueryProcedure {
115        #[snafu(implicit)]
116        location: Location,
117        source: common_procedure::Error,
118    },
119
120    #[snafu(display("Procedure not found: {pid}"))]
121    ProcedureNotFound {
122        #[snafu(implicit)]
123        location: Location,
124        pid: String,
125    },
126
127    #[snafu(display("Failed to parse procedure id: {key}"))]
128    ParseProcedureId {
129        #[snafu(implicit)]
130        location: Location,
131        key: String,
132        #[snafu(source)]
133        error: common_procedure::ParseIdError,
134    },
135
136    #[snafu(display("Unsupported operation {}", operation))]
137    Unsupported {
138        operation: String,
139        #[snafu(implicit)]
140        location: Location,
141    },
142
143    #[snafu(display("Failed to wait procedure done"))]
144    WaitProcedure {
145        #[snafu(implicit)]
146        location: Location,
147        source: common_procedure::Error,
148    },
149
150    #[snafu(display("Failed to start procedure manager"))]
151    StartProcedureManager {
152        #[snafu(implicit)]
153        location: Location,
154        source: common_procedure::Error,
155    },
156
157    #[snafu(display("Failed to stop procedure manager"))]
158    StopProcedureManager {
159        #[snafu(implicit)]
160        location: Location,
161        source: common_procedure::Error,
162    },
163
164    #[snafu(display(
165        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
166    ))]
167    ProcedureOutput {
168        procedure_id: String,
169        err_msg: String,
170        #[snafu(implicit)]
171        location: Location,
172    },
173
174    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
175    ConvertRawTableInfo {
176        #[snafu(implicit)]
177        location: Location,
178        source: datatypes::Error,
179    },
180
181    #[snafu(display("Primary key '{key}' not found when creating region request"))]
182    PrimaryKeyNotFound {
183        key: String,
184        #[snafu(implicit)]
185        location: Location,
186    },
187
188    #[snafu(display("Failed to build table meta for table: {}", table_name))]
189    BuildTableMeta {
190        table_name: String,
191        #[snafu(source)]
192        error: table::metadata::TableMetaBuilderError,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Table occurs error"))]
198    Table {
199        #[snafu(implicit)]
200        location: Location,
201        source: table::error::Error,
202    },
203
204    #[snafu(display("Failed to find table route for table id {}", table_id))]
205    TableRouteNotFound {
206        table_id: TableId,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to decode protobuf"))]
212    DecodeProto {
213        #[snafu(implicit)]
214        location: Location,
215        #[snafu(source)]
216        error: prost::DecodeError,
217    },
218
219    #[snafu(display("Failed to encode object into json"))]
220    EncodeJson {
221        #[snafu(implicit)]
222        location: Location,
223        #[snafu(source)]
224        error: JsonError,
225    },
226
227    #[snafu(display("Failed to decode object from json"))]
228    DecodeJson {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: JsonError,
233    },
234
235    #[snafu(display("Failed to serialize to json: {}", input))]
236    SerializeToJson {
237        input: String,
238        #[snafu(source)]
239        error: serde_json::error::Error,
240        #[snafu(implicit)]
241        location: Location,
242    },
243
244    #[snafu(display("Failed to deserialize from json: {}", input))]
245    DeserializeFromJson {
246        input: String,
247        #[snafu(source)]
248        error: serde_json::error::Error,
249        #[snafu(implicit)]
250        location: Location,
251    },
252
253    #[snafu(display("Payload not exist"))]
254    PayloadNotExist {
255        #[snafu(implicit)]
256        location: Location,
257    },
258
259    #[snafu(display("Failed to send message: {err_msg}"))]
260    SendMessage {
261        err_msg: String,
262        #[snafu(implicit)]
263        location: Location,
264    },
265
266    #[snafu(display("Failed to serde json"))]
267    SerdeJson {
268        #[snafu(source)]
269        error: serde_json::error::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to parse value {} into key {}", value, key))]
275    ParseOption {
276        key: String,
277        value: String,
278        #[snafu(implicit)]
279        location: Location,
280    },
281
282    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
283    RouteInfoCorrupted {
284        err_msg: String,
285        #[snafu(implicit)]
286        location: Location,
287    },
288
289    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
290    IllegalServerState {
291        code: i32,
292        err_msg: String,
293        #[snafu(implicit)]
294        location: Location,
295    },
296
297    #[snafu(display("Failed to convert alter table request"))]
298    ConvertAlterTableRequest {
299        source: common_grpc_expr::error::Error,
300        #[snafu(implicit)]
301        location: Location,
302    },
303
304    #[snafu(display("Invalid protobuf message: {err_msg}"))]
305    InvalidProtoMsg {
306        err_msg: String,
307        #[snafu(implicit)]
308        location: Location,
309    },
310
311    #[snafu(display("Unexpected: {err_msg}"))]
312    Unexpected {
313        err_msg: String,
314        #[snafu(implicit)]
315        location: Location,
316    },
317
318    #[snafu(display("Table already exists, table: {}", table_name))]
319    TableAlreadyExists {
320        table_name: String,
321        #[snafu(implicit)]
322        location: Location,
323    },
324
325    #[snafu(display("View already exists, view: {}", view_name))]
326    ViewAlreadyExists {
327        view_name: String,
328        #[snafu(implicit)]
329        location: Location,
330    },
331
332    #[snafu(display("Flow already exists: {}", flow_name))]
333    FlowAlreadyExists {
334        flow_name: String,
335        #[snafu(implicit)]
336        location: Location,
337    },
338
339    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
340    SchemaAlreadyExists {
341        catalog: String,
342        schema: String,
343        #[snafu(implicit)]
344        location: Location,
345    },
346
347    #[snafu(display("Failed to convert raw key to str"))]
348    ConvertRawKey {
349        #[snafu(implicit)]
350        location: Location,
351        #[snafu(source)]
352        error: Utf8Error,
353    },
354
355    #[snafu(display("Table not found: '{}'", table_name))]
356    TableNotFound {
357        table_name: String,
358        #[snafu(implicit)]
359        location: Location,
360    },
361
362    #[snafu(display("View not found: '{}'", view_name))]
363    ViewNotFound {
364        view_name: String,
365        #[snafu(implicit)]
366        location: Location,
367    },
368
369    #[snafu(display("Flow not found: '{}'", flow_name))]
370    FlowNotFound {
371        flow_name: String,
372        #[snafu(implicit)]
373        location: Location,
374    },
375
376    #[snafu(display("Flow route not found: '{}'", flow_name))]
377    FlowRouteNotFound {
378        flow_name: String,
379        #[snafu(implicit)]
380        location: Location,
381    },
382
383    #[snafu(display("Schema nod found, schema: {}", table_schema))]
384    SchemaNotFound {
385        table_schema: String,
386        #[snafu(implicit)]
387        location: Location,
388    },
389
390    #[snafu(display("Invalid metadata, err: {}", err_msg))]
391    InvalidMetadata {
392        err_msg: String,
393        #[snafu(implicit)]
394        location: Location,
395    },
396
397    #[snafu(display("Invalid view info, err: {}", err_msg))]
398    InvalidViewInfo {
399        err_msg: String,
400        #[snafu(implicit)]
401        location: Location,
402    },
403
404    #[snafu(display("Invalid flow request body: {:?}", body))]
405    InvalidFlowRequestBody {
406        body: Box<Option<api::v1::flow::flow_request::Body>>,
407        #[snafu(implicit)]
408        location: Location,
409    },
410
411    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
412    GetKvCache { err_msg: String },
413
414    #[snafu(display("Get null from cache, key: {}", key))]
415    CacheNotGet {
416        key: String,
417        #[snafu(implicit)]
418        location: Location,
419    },
420
421    #[snafu(display("Etcd txn error: {err_msg}"))]
422    EtcdTxnOpResponse {
423        err_msg: String,
424        #[snafu(implicit)]
425        location: Location,
426    },
427
428    #[snafu(display("External error"))]
429    External {
430        #[snafu(implicit)]
431        location: Location,
432        source: BoxedError,
433    },
434
435    #[snafu(display("The response exceeded size limit"))]
436    ResponseExceededSizeLimit {
437        #[snafu(implicit)]
438        location: Location,
439        source: BoxedError,
440    },
441
442    #[snafu(display("Invalid heartbeat response"))]
443    InvalidHeartbeatResponse {
444        #[snafu(implicit)]
445        location: Location,
446    },
447
448    #[snafu(display("Failed to operate on datanode: {}", peer))]
449    OperateDatanode {
450        #[snafu(implicit)]
451        location: Location,
452        peer: Peer,
453        source: BoxedError,
454    },
455
456    #[snafu(display("Retry later"))]
457    RetryLater { source: BoxedError },
458
459    #[snafu(display("Abort procedure"))]
460    AbortProcedure {
461        #[snafu(implicit)]
462        location: Location,
463        source: BoxedError,
464        clean_poisons: bool,
465    },
466
467    #[snafu(display(
468        "Failed to encode a wal options to json string, wal_options: {:?}",
469        wal_options
470    ))]
471    EncodeWalOptions {
472        wal_options: WalOptions,
473        #[snafu(source)]
474        error: serde_json::Error,
475        #[snafu(implicit)]
476        location: Location,
477    },
478
479    #[snafu(display("Invalid number of topics {}", num_topics))]
480    InvalidNumTopics {
481        num_topics: usize,
482        #[snafu(implicit)]
483        location: Location,
484    },
485
486    #[snafu(display(
487        "Failed to build a Kafka client, broker endpoints: {:?}",
488        broker_endpoints
489    ))]
490    BuildKafkaClient {
491        broker_endpoints: Vec<String>,
492        #[snafu(implicit)]
493        location: Location,
494        #[snafu(source)]
495        error: rskafka::client::error::Error,
496    },
497
498    #[snafu(display("Failed to create TLS Config"))]
499    TlsConfig {
500        #[snafu(implicit)]
501        location: Location,
502        source: common_wal::error::Error,
503    },
504
505    #[snafu(display("Failed to resolve Kafka broker endpoint."))]
506    ResolveKafkaEndpoint { source: common_wal::error::Error },
507
508    #[snafu(display("Failed to build a Kafka controller client"))]
509    BuildKafkaCtrlClient {
510        #[snafu(implicit)]
511        location: Location,
512        #[snafu(source)]
513        error: rskafka::client::error::Error,
514    },
515
516    #[snafu(display(
517        "Failed to get a Kafka partition client, topic: {}, partition: {}",
518        topic,
519        partition
520    ))]
521    KafkaPartitionClient {
522        topic: String,
523        partition: i32,
524        #[snafu(implicit)]
525        location: Location,
526        #[snafu(source)]
527        error: rskafka::client::error::Error,
528    },
529
530    #[snafu(display(
531        "Failed to get offset from Kafka, topic: {}, partition: {}",
532        topic,
533        partition
534    ))]
535    KafkaGetOffset {
536        topic: String,
537        partition: i32,
538        #[snafu(implicit)]
539        location: Location,
540        #[snafu(source)]
541        error: rskafka::client::error::Error,
542    },
543
544    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
545    ProduceRecord {
546        topic: String,
547        #[snafu(implicit)]
548        location: Location,
549        #[snafu(source)]
550        error: rskafka::client::error::Error,
551    },
552
553    #[snafu(display("Failed to create a Kafka wal topic"))]
554    CreateKafkaWalTopic {
555        #[snafu(implicit)]
556        location: Location,
557        #[snafu(source)]
558        error: rskafka::client::error::Error,
559    },
560
561    #[snafu(display("The topic pool is empty"))]
562    EmptyTopicPool {
563        #[snafu(implicit)]
564        location: Location,
565    },
566
567    #[snafu(display("Unexpected table route type: {}", err_msg))]
568    UnexpectedLogicalRouteTable {
569        #[snafu(implicit)]
570        location: Location,
571        err_msg: String,
572    },
573
574    #[snafu(display("The tasks of {} cannot be empty", name))]
575    EmptyDdlTasks {
576        name: String,
577        #[snafu(implicit)]
578        location: Location,
579    },
580
581    #[snafu(display("Metadata corruption: {}", err_msg))]
582    MetadataCorruption {
583        err_msg: String,
584        #[snafu(implicit)]
585        location: Location,
586    },
587
588    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
589    AlterLogicalTablesInvalidArguments {
590        err_msg: String,
591        #[snafu(implicit)]
592        location: Location,
593    },
594
595    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
596    CreateLogicalTablesInvalidArguments {
597        err_msg: String,
598        #[snafu(implicit)]
599        location: Location,
600    },
601
602    #[snafu(display("Invalid node info key: {}", key))]
603    InvalidNodeInfoKey {
604        key: String,
605        #[snafu(implicit)]
606        location: Location,
607    },
608
609    #[snafu(display("Invalid node stat key: {}", key))]
610    InvalidStatKey {
611        key: String,
612        #[snafu(implicit)]
613        location: Location,
614    },
615
616    #[snafu(display("Failed to parse number: {}", err_msg))]
617    ParseNum {
618        err_msg: String,
619        #[snafu(source)]
620        error: std::num::ParseIntError,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Invalid role: {}", role))]
626    InvalidRole {
627        role: i32,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
633    InvalidSetDatabaseOption {
634        key: String,
635        value: String,
636        #[snafu(implicit)]
637        location: Location,
638    },
639
640    #[snafu(display("Invalid unset database option, key: {}", key))]
641    InvalidUnsetDatabaseOption {
642        key: String,
643        #[snafu(implicit)]
644        location: Location,
645    },
646
647    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
648    MismatchPrefix {
649        prefix: String,
650        key: String,
651        #[snafu(implicit)]
652        location: Location,
653    },
654
655    #[snafu(display("Failed to move values: {err_msg}"))]
656    MoveValues {
657        err_msg: String,
658        #[snafu(implicit)]
659        location: Location,
660    },
661
662    #[snafu(display("Failed to parse {} from utf8", name))]
663    FromUtf8 {
664        name: String,
665        #[snafu(source)]
666        error: std::string::FromUtf8Error,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Value not exists"))]
672    ValueNotExist {
673        #[snafu(implicit)]
674        location: Location,
675    },
676
677    #[snafu(display("Failed to get cache"))]
678    GetCache { source: Arc<Error> },
679
680    #[cfg(feature = "pg_kvbackend")]
681    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
682    PostgresExecution {
683        sql: String,
684        #[snafu(source)]
685        error: tokio_postgres::Error,
686        #[snafu(implicit)]
687        location: Location,
688    },
689
690    #[cfg(feature = "pg_kvbackend")]
691    #[snafu(display("Failed to create connection pool for Postgres"))]
692    CreatePostgresPool {
693        #[snafu(source)]
694        error: deadpool_postgres::CreatePoolError,
695        #[snafu(implicit)]
696        location: Location,
697    },
698
699    #[cfg(feature = "pg_kvbackend")]
700    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
701    GetPostgresConnection {
702        reason: String,
703        #[snafu(implicit)]
704        location: Location,
705    },
706
707    #[cfg(feature = "pg_kvbackend")]
708    #[snafu(display("Failed to {} Postgres transaction", operation))]
709    PostgresTransaction {
710        #[snafu(source)]
711        error: tokio_postgres::Error,
712        #[snafu(implicit)]
713        location: Location,
714        operation: String,
715    },
716
717    #[cfg(feature = "mysql_kvbackend")]
718    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
719    MySqlExecution {
720        sql: String,
721        #[snafu(source)]
722        error: sqlx::Error,
723        #[snafu(implicit)]
724        location: Location,
725    },
726
727    #[cfg(feature = "mysql_kvbackend")]
728    #[snafu(display("Failed to create connection pool for MySql"))]
729    CreateMySqlPool {
730        #[snafu(source)]
731        error: sqlx::Error,
732        #[snafu(implicit)]
733        location: Location,
734    },
735
736    #[cfg(feature = "mysql_kvbackend")]
737    #[snafu(display("Failed to {} MySql transaction", operation))]
738    MySqlTransaction {
739        #[snafu(source)]
740        error: sqlx::Error,
741        #[snafu(implicit)]
742        location: Location,
743        operation: String,
744    },
745
746    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
747    #[snafu(display("Rds transaction retry failed"))]
748    RdsTransactionRetryFailed {
749        #[snafu(implicit)]
750        location: Location,
751    },
752
753    #[snafu(display(
754        "Datanode table info not found, table id: {}, datanode id: {}",
755        table_id,
756        datanode_id
757    ))]
758    DatanodeTableInfoNotFound {
759        datanode_id: DatanodeId,
760        table_id: TableId,
761        #[snafu(implicit)]
762        location: Location,
763    },
764
765    #[snafu(display("Invalid topic name prefix: {}", prefix))]
766    InvalidTopicNamePrefix {
767        prefix: String,
768        #[snafu(implicit)]
769        location: Location,
770    },
771
772    #[snafu(display("Failed to parse wal options: {}", wal_options))]
773    ParseWalOptions {
774        wal_options: String,
775        #[snafu(implicit)]
776        location: Location,
777        #[snafu(source)]
778        error: serde_json::Error,
779    },
780
781    #[snafu(display("No leader found for table_id: {}", table_id))]
782    NoLeader {
783        table_id: TableId,
784        #[snafu(implicit)]
785        location: Location,
786    },
787
788    #[snafu(display(
789        "Procedure poison key already exists with a different value, key: {}, value: {}",
790        key,
791        value
792    ))]
793    ProcedurePoisonConflict {
794        key: String,
795        value: String,
796        #[snafu(implicit)]
797        location: Location,
798    },
799
800    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
801    PutPoison {
802        #[snafu(implicit)]
803        location: Location,
804        #[snafu(source)]
805        source: common_procedure::error::Error,
806    },
807
808    #[snafu(display("Failed to parse timezone"))]
809    InvalidTimeZone {
810        #[snafu(implicit)]
811        location: Location,
812        #[snafu(source)]
813        error: common_time::error::Error,
814    },
815}
816
817pub type Result<T> = std::result::Result<T, Error>;
818
819impl ErrorExt for Error {
820    fn status_code(&self) -> StatusCode {
821        use Error::*;
822        match self {
823            IllegalServerState { .. }
824            | EtcdTxnOpResponse { .. }
825            | EtcdFailed { .. }
826            | EtcdTxnFailed { .. }
827            | ConnectEtcd { .. }
828            | MoveValues { .. }
829            | GetCache { .. }
830            | SerializeToJson { .. }
831            | DeserializeFromJson { .. } => StatusCode::Internal,
832
833            NoLeader { .. } => StatusCode::TableUnavailable,
834            ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
835
836            Unsupported { .. } => StatusCode::Unsupported,
837
838            SerdeJson { .. }
839            | ParseOption { .. }
840            | RouteInfoCorrupted { .. }
841            | InvalidProtoMsg { .. }
842            | InvalidMetadata { .. }
843            | Unexpected { .. }
844            | TableInfoNotFound { .. }
845            | NextSequence { .. }
846            | UnexpectedSequenceValue { .. }
847            | InvalidHeartbeatResponse { .. }
848            | EncodeJson { .. }
849            | DecodeJson { .. }
850            | PayloadNotExist { .. }
851            | ConvertRawKey { .. }
852            | DecodeProto { .. }
853            | BuildTableMeta { .. }
854            | TableRouteNotFound { .. }
855            | ConvertRawTableInfo { .. }
856            | RegionOperatingRace { .. }
857            | EncodeWalOptions { .. }
858            | BuildKafkaClient { .. }
859            | BuildKafkaCtrlClient { .. }
860            | KafkaPartitionClient { .. }
861            | ResolveKafkaEndpoint { .. }
862            | ProduceRecord { .. }
863            | CreateKafkaWalTopic { .. }
864            | EmptyTopicPool { .. }
865            | UnexpectedLogicalRouteTable { .. }
866            | ProcedureOutput { .. }
867            | FromUtf8 { .. }
868            | MetadataCorruption { .. }
869            | ParseWalOptions { .. }
870            | KafkaGetOffset { .. } => StatusCode::Unexpected,
871
872            SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
873
874            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
875
876            ProcedureNotFound { .. }
877            | InvalidViewInfo { .. }
878            | PrimaryKeyNotFound { .. }
879            | EmptyKey { .. }
880            | AlterLogicalTablesInvalidArguments { .. }
881            | CreateLogicalTablesInvalidArguments { .. }
882            | MismatchPrefix { .. }
883            | TlsConfig { .. }
884            | InvalidSetDatabaseOption { .. }
885            | InvalidUnsetDatabaseOption { .. }
886            | InvalidTopicNamePrefix { .. }
887            | InvalidTimeZone { .. } => StatusCode::InvalidArguments,
888            InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
889
890            FlowNotFound { .. } => StatusCode::FlowNotFound,
891            FlowRouteNotFound { .. } => StatusCode::Unexpected,
892            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
893
894            ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
895            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
896
897            SubmitProcedure { source, .. }
898            | QueryProcedure { source, .. }
899            | WaitProcedure { source, .. }
900            | StartProcedureManager { source, .. }
901            | StopProcedureManager { source, .. } => source.status_code(),
902            RegisterProcedureLoader { source, .. } => source.status_code(),
903            External { source, .. } => source.status_code(),
904            ResponseExceededSizeLimit { source, .. } => source.status_code(),
905            OperateDatanode { source, .. } => source.status_code(),
906            Table { source, .. } => source.status_code(),
907            RetryLater { source, .. } => source.status_code(),
908            AbortProcedure { source, .. } => source.status_code(),
909            ConvertAlterTableRequest { source, .. } => source.status_code(),
910            PutPoison { source, .. } => source.status_code(),
911
912            ParseProcedureId { .. }
913            | InvalidNumTopics { .. }
914            | SchemaNotFound { .. }
915            | InvalidNodeInfoKey { .. }
916            | InvalidStatKey { .. }
917            | ParseNum { .. }
918            | InvalidRole { .. }
919            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
920
921            #[cfg(feature = "pg_kvbackend")]
922            PostgresExecution { .. }
923            | CreatePostgresPool { .. }
924            | GetPostgresConnection { .. }
925            | PostgresTransaction { .. } => StatusCode::Internal,
926            #[cfg(feature = "mysql_kvbackend")]
927            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
928                StatusCode::Internal
929            }
930            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
931            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
932            Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
933        }
934    }
935
936    fn as_any(&self) -> &dyn std::any::Any {
937        self
938    }
939}
940
941impl Error {
942    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
943    /// Check if the error is a serialization error.
944    pub fn is_serialization_error(&self) -> bool {
945        match self {
946            #[cfg(feature = "pg_kvbackend")]
947            Error::PostgresTransaction { error, .. } => {
948                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
949            }
950            #[cfg(feature = "pg_kvbackend")]
951            Error::PostgresExecution { error, .. } => {
952                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
953            }
954            #[cfg(feature = "mysql_kvbackend")]
955            Error::MySqlExecution {
956                error: sqlx::Error::Database(database_error),
957                ..
958            } => {
959                matches!(
960                    database_error.message(),
961                    "Deadlock found when trying to get lock; try restarting transaction"
962                        | "can't serialize access for this transaction"
963                )
964            }
965            _ => false,
966        }
967    }
968
969    /// Creates a new [Error::RetryLater] error from source `err`.
970    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
971        Error::RetryLater {
972            source: BoxedError::new(err),
973        }
974    }
975
976    /// Determine whether it is a retry later type through [StatusCode]
977    pub fn is_retry_later(&self) -> bool {
978        matches!(self, Error::RetryLater { .. })
979    }
980
981    /// Determine whether it needs to clean poisons.
982    pub fn need_clean_poisons(&self) -> bool {
983        matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons)
984    }
985
986    /// Returns true if the response exceeds the size limit.
987    pub fn is_exceeded_size_limit(&self) -> bool {
988        match self {
989            Error::EtcdFailed {
990                error: etcd_client::Error::GRpcStatus(status),
991                ..
992            } => status.code() == tonic::Code::OutOfRange,
993            Error::ResponseExceededSizeLimit { .. } => true,
994            _ => false,
995        }
996    }
997}