common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_wal::options::WalOptions;
22use serde_json::error::Error as JsonError;
23use snafu::{Location, Snafu};
24use store_api::storage::RegionId;
25use table::metadata::TableId;
26
27use crate::peer::Peer;
28use crate::DatanodeId;
29
30#[derive(Snafu)]
31#[snafu(visibility(pub))]
32#[stack_trace_debug]
33pub enum Error {
34    #[snafu(display("Empty key is not allowed"))]
35    EmptyKey {
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display(
41        "Another procedure is operating the region: {} on peer: {}",
42        region_id,
43        peer_id
44    ))]
45    RegionOperatingRace {
46        #[snafu(implicit)]
47        location: Location,
48        peer_id: DatanodeId,
49        region_id: RegionId,
50    },
51
52    #[snafu(display("Failed to connect to Etcd"))]
53    ConnectEtcd {
54        #[snafu(source)]
55        error: etcd_client::Error,
56        #[snafu(implicit)]
57        location: Location,
58    },
59
60    #[snafu(display("Failed to execute via Etcd"))]
61    EtcdFailed {
62        #[snafu(source)]
63        error: etcd_client::Error,
64        #[snafu(implicit)]
65        location: Location,
66    },
67
68    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
69    EtcdTxnFailed {
70        max_operations: usize,
71        #[snafu(source)]
72        error: etcd_client::Error,
73        #[snafu(implicit)]
74        location: Location,
75    },
76
77    #[snafu(display("Failed to get sequence: {}", err_msg))]
78    NextSequence {
79        err_msg: String,
80        #[snafu(implicit)]
81        location: Location,
82    },
83
84    #[snafu(display("Unexpected sequence value: {}", err_msg))]
85    UnexpectedSequenceValue {
86        err_msg: String,
87        #[snafu(implicit)]
88        location: Location,
89    },
90
91    #[snafu(display("Table info not found: {}", table))]
92    TableInfoNotFound {
93        table: String,
94        #[snafu(implicit)]
95        location: Location,
96    },
97
98    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
99    RegisterProcedureLoader {
100        type_name: String,
101        #[snafu(implicit)]
102        location: Location,
103        source: common_procedure::error::Error,
104    },
105
106    #[snafu(display("Failed to submit procedure"))]
107    SubmitProcedure {
108        #[snafu(implicit)]
109        location: Location,
110        source: common_procedure::Error,
111    },
112
113    #[snafu(display("Failed to query procedure"))]
114    QueryProcedure {
115        #[snafu(implicit)]
116        location: Location,
117        source: common_procedure::Error,
118    },
119
120    #[snafu(display("Procedure not found: {pid}"))]
121    ProcedureNotFound {
122        #[snafu(implicit)]
123        location: Location,
124        pid: String,
125    },
126
127    #[snafu(display("Failed to parse procedure id: {key}"))]
128    ParseProcedureId {
129        #[snafu(implicit)]
130        location: Location,
131        key: String,
132        #[snafu(source)]
133        error: common_procedure::ParseIdError,
134    },
135
136    #[snafu(display("Unsupported operation {}", operation))]
137    Unsupported {
138        operation: String,
139        #[snafu(implicit)]
140        location: Location,
141    },
142
143    #[snafu(display("Failed to wait procedure done"))]
144    WaitProcedure {
145        #[snafu(implicit)]
146        location: Location,
147        source: common_procedure::Error,
148    },
149
150    #[snafu(display("Failed to start procedure manager"))]
151    StartProcedureManager {
152        #[snafu(implicit)]
153        location: Location,
154        source: common_procedure::Error,
155    },
156
157    #[snafu(display("Failed to stop procedure manager"))]
158    StopProcedureManager {
159        #[snafu(implicit)]
160        location: Location,
161        source: common_procedure::Error,
162    },
163
164    #[snafu(display(
165        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
166    ))]
167    ProcedureOutput {
168        procedure_id: String,
169        err_msg: String,
170        #[snafu(implicit)]
171        location: Location,
172    },
173
174    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
175    ConvertRawTableInfo {
176        #[snafu(implicit)]
177        location: Location,
178        source: datatypes::Error,
179    },
180
181    #[snafu(display("Primary key '{key}' not found when creating region request"))]
182    PrimaryKeyNotFound {
183        key: String,
184        #[snafu(implicit)]
185        location: Location,
186    },
187
188    #[snafu(display("Failed to build table meta for table: {}", table_name))]
189    BuildTableMeta {
190        table_name: String,
191        #[snafu(source)]
192        error: table::metadata::TableMetaBuilderError,
193        #[snafu(implicit)]
194        location: Location,
195    },
196
197    #[snafu(display("Table occurs error"))]
198    Table {
199        #[snafu(implicit)]
200        location: Location,
201        source: table::error::Error,
202    },
203
204    #[snafu(display("Failed to find table route for table id {}", table_id))]
205    TableRouteNotFound {
206        table_id: TableId,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to decode protobuf"))]
212    DecodeProto {
213        #[snafu(implicit)]
214        location: Location,
215        #[snafu(source)]
216        error: prost::DecodeError,
217    },
218
219    #[snafu(display("Failed to encode object into json"))]
220    EncodeJson {
221        #[snafu(implicit)]
222        location: Location,
223        #[snafu(source)]
224        error: JsonError,
225    },
226
227    #[snafu(display("Failed to decode object from json"))]
228    DecodeJson {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: JsonError,
233    },
234
235    #[snafu(display("Failed to serialize to json: {}", input))]
236    SerializeToJson {
237        input: String,
238        #[snafu(source)]
239        error: serde_json::error::Error,
240        #[snafu(implicit)]
241        location: Location,
242    },
243
244    #[snafu(display("Failed to deserialize from json: {}", input))]
245    DeserializeFromJson {
246        input: String,
247        #[snafu(source)]
248        error: serde_json::error::Error,
249        #[snafu(implicit)]
250        location: Location,
251    },
252
253    #[snafu(display("Payload not exist"))]
254    PayloadNotExist {
255        #[snafu(implicit)]
256        location: Location,
257    },
258
259    #[snafu(display("Failed to send message: {err_msg}"))]
260    SendMessage {
261        err_msg: String,
262        #[snafu(implicit)]
263        location: Location,
264    },
265
266    #[snafu(display("Failed to serde json"))]
267    SerdeJson {
268        #[snafu(source)]
269        error: serde_json::error::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to parse value {} into key {}", value, key))]
275    ParseOption {
276        key: String,
277        value: String,
278        #[snafu(implicit)]
279        location: Location,
280    },
281
282    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
283    RouteInfoCorrupted {
284        err_msg: String,
285        #[snafu(implicit)]
286        location: Location,
287    },
288
289    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
290    IllegalServerState {
291        code: i32,
292        err_msg: String,
293        #[snafu(implicit)]
294        location: Location,
295    },
296
297    #[snafu(display("Failed to convert alter table request"))]
298    ConvertAlterTableRequest {
299        source: common_grpc_expr::error::Error,
300        #[snafu(implicit)]
301        location: Location,
302    },
303
304    #[snafu(display("Invalid protobuf message: {err_msg}"))]
305    InvalidProtoMsg {
306        err_msg: String,
307        #[snafu(implicit)]
308        location: Location,
309    },
310
311    #[snafu(display("Unexpected: {err_msg}"))]
312    Unexpected {
313        err_msg: String,
314        #[snafu(implicit)]
315        location: Location,
316    },
317
318    #[snafu(display("Table already exists, table: {}", table_name))]
319    TableAlreadyExists {
320        table_name: String,
321        #[snafu(implicit)]
322        location: Location,
323    },
324
325    #[snafu(display("View already exists, view: {}", view_name))]
326    ViewAlreadyExists {
327        view_name: String,
328        #[snafu(implicit)]
329        location: Location,
330    },
331
332    #[snafu(display("Flow already exists: {}", flow_name))]
333    FlowAlreadyExists {
334        flow_name: String,
335        #[snafu(implicit)]
336        location: Location,
337    },
338
339    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
340    SchemaAlreadyExists {
341        catalog: String,
342        schema: String,
343        #[snafu(implicit)]
344        location: Location,
345    },
346
347    #[snafu(display("Failed to convert raw key to str"))]
348    ConvertRawKey {
349        #[snafu(implicit)]
350        location: Location,
351        #[snafu(source)]
352        error: Utf8Error,
353    },
354
355    #[snafu(display("Table not found: '{}'", table_name))]
356    TableNotFound {
357        table_name: String,
358        #[snafu(implicit)]
359        location: Location,
360    },
361
362    #[snafu(display("View not found: '{}'", view_name))]
363    ViewNotFound {
364        view_name: String,
365        #[snafu(implicit)]
366        location: Location,
367    },
368
369    #[snafu(display("Flow not found: '{}'", flow_name))]
370    FlowNotFound {
371        flow_name: String,
372        #[snafu(implicit)]
373        location: Location,
374    },
375
376    #[snafu(display("Flow route not found: '{}'", flow_name))]
377    FlowRouteNotFound {
378        flow_name: String,
379        #[snafu(implicit)]
380        location: Location,
381    },
382
383    #[snafu(display("Schema nod found, schema: {}", table_schema))]
384    SchemaNotFound {
385        table_schema: String,
386        #[snafu(implicit)]
387        location: Location,
388    },
389
390    #[snafu(display("Invalid metadata, err: {}", err_msg))]
391    InvalidMetadata {
392        err_msg: String,
393        #[snafu(implicit)]
394        location: Location,
395    },
396
397    #[snafu(display("Invalid view info, err: {}", err_msg))]
398    InvalidViewInfo {
399        err_msg: String,
400        #[snafu(implicit)]
401        location: Location,
402    },
403
404    #[snafu(display("Invalid flow request body: {:?}", body))]
405    InvalidFlowRequestBody {
406        body: Box<Option<api::v1::flow::flow_request::Body>>,
407        #[snafu(implicit)]
408        location: Location,
409    },
410
411    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
412    GetKvCache { err_msg: String },
413
414    #[snafu(display("Get null from cache, key: {}", key))]
415    CacheNotGet {
416        key: String,
417        #[snafu(implicit)]
418        location: Location,
419    },
420
421    #[snafu(display("Etcd txn error: {err_msg}"))]
422    EtcdTxnOpResponse {
423        err_msg: String,
424        #[snafu(implicit)]
425        location: Location,
426    },
427
428    #[snafu(display("External error"))]
429    External {
430        #[snafu(implicit)]
431        location: Location,
432        source: BoxedError,
433    },
434
435    #[snafu(display("The response exceeded size limit"))]
436    ResponseExceededSizeLimit {
437        #[snafu(implicit)]
438        location: Location,
439        source: BoxedError,
440    },
441
442    #[snafu(display("Invalid heartbeat response"))]
443    InvalidHeartbeatResponse {
444        #[snafu(implicit)]
445        location: Location,
446    },
447
448    #[snafu(display("Failed to operate on datanode: {}", peer))]
449    OperateDatanode {
450        #[snafu(implicit)]
451        location: Location,
452        peer: Peer,
453        source: BoxedError,
454    },
455
456    #[snafu(display("Retry later"))]
457    RetryLater {
458        source: BoxedError,
459        clean_poisons: bool,
460    },
461
462    #[snafu(display("Abort procedure"))]
463    AbortProcedure {
464        #[snafu(implicit)]
465        location: Location,
466        source: BoxedError,
467        clean_poisons: bool,
468    },
469
470    #[snafu(display(
471        "Failed to encode a wal options to json string, wal_options: {:?}",
472        wal_options
473    ))]
474    EncodeWalOptions {
475        wal_options: WalOptions,
476        #[snafu(source)]
477        error: serde_json::Error,
478        #[snafu(implicit)]
479        location: Location,
480    },
481
482    #[snafu(display("Invalid number of topics {}", num_topics))]
483    InvalidNumTopics {
484        num_topics: usize,
485        #[snafu(implicit)]
486        location: Location,
487    },
488
489    #[snafu(display(
490        "Failed to build a Kafka client, broker endpoints: {:?}",
491        broker_endpoints
492    ))]
493    BuildKafkaClient {
494        broker_endpoints: Vec<String>,
495        #[snafu(implicit)]
496        location: Location,
497        #[snafu(source)]
498        error: rskafka::client::error::Error,
499    },
500
501    #[snafu(display("Failed to create TLS Config"))]
502    TlsConfig {
503        #[snafu(implicit)]
504        location: Location,
505        source: common_wal::error::Error,
506    },
507
508    #[snafu(display("Failed to resolve Kafka broker endpoint."))]
509    ResolveKafkaEndpoint { source: common_wal::error::Error },
510
511    #[snafu(display("Failed to build a Kafka controller client"))]
512    BuildKafkaCtrlClient {
513        #[snafu(implicit)]
514        location: Location,
515        #[snafu(source)]
516        error: rskafka::client::error::Error,
517    },
518
519    #[snafu(display(
520        "Failed to get a Kafka partition client, topic: {}, partition: {}",
521        topic,
522        partition
523    ))]
524    KafkaPartitionClient {
525        topic: String,
526        partition: i32,
527        #[snafu(implicit)]
528        location: Location,
529        #[snafu(source)]
530        error: rskafka::client::error::Error,
531    },
532
533    #[snafu(display(
534        "Failed to get offset from Kafka, topic: {}, partition: {}",
535        topic,
536        partition
537    ))]
538    KafkaGetOffset {
539        topic: String,
540        partition: i32,
541        #[snafu(implicit)]
542        location: Location,
543        #[snafu(source)]
544        error: rskafka::client::error::Error,
545    },
546
547    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
548    ProduceRecord {
549        topic: String,
550        #[snafu(implicit)]
551        location: Location,
552        #[snafu(source)]
553        error: rskafka::client::error::Error,
554    },
555
556    #[snafu(display("Failed to create a Kafka wal topic"))]
557    CreateKafkaWalTopic {
558        #[snafu(implicit)]
559        location: Location,
560        #[snafu(source)]
561        error: rskafka::client::error::Error,
562    },
563
564    #[snafu(display("The topic pool is empty"))]
565    EmptyTopicPool {
566        #[snafu(implicit)]
567        location: Location,
568    },
569
570    #[snafu(display("Unexpected table route type: {}", err_msg))]
571    UnexpectedLogicalRouteTable {
572        #[snafu(implicit)]
573        location: Location,
574        err_msg: String,
575    },
576
577    #[snafu(display("The tasks of {} cannot be empty", name))]
578    EmptyDdlTasks {
579        name: String,
580        #[snafu(implicit)]
581        location: Location,
582    },
583
584    #[snafu(display("Metadata corruption: {}", err_msg))]
585    MetadataCorruption {
586        err_msg: String,
587        #[snafu(implicit)]
588        location: Location,
589    },
590
591    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
592    AlterLogicalTablesInvalidArguments {
593        err_msg: String,
594        #[snafu(implicit)]
595        location: Location,
596    },
597
598    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
599    CreateLogicalTablesInvalidArguments {
600        err_msg: String,
601        #[snafu(implicit)]
602        location: Location,
603    },
604
605    #[snafu(display("Invalid node info key: {}", key))]
606    InvalidNodeInfoKey {
607        key: String,
608        #[snafu(implicit)]
609        location: Location,
610    },
611
612    #[snafu(display("Invalid node stat key: {}", key))]
613    InvalidStatKey {
614        key: String,
615        #[snafu(implicit)]
616        location: Location,
617    },
618
619    #[snafu(display("Failed to parse number: {}", err_msg))]
620    ParseNum {
621        err_msg: String,
622        #[snafu(source)]
623        error: std::num::ParseIntError,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Invalid role: {}", role))]
629    InvalidRole {
630        role: i32,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
636    InvalidSetDatabaseOption {
637        key: String,
638        value: String,
639        #[snafu(implicit)]
640        location: Location,
641    },
642
643    #[snafu(display("Invalid unset database option, key: {}", key))]
644    InvalidUnsetDatabaseOption {
645        key: String,
646        #[snafu(implicit)]
647        location: Location,
648    },
649
650    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
651    MismatchPrefix {
652        prefix: String,
653        key: String,
654        #[snafu(implicit)]
655        location: Location,
656    },
657
658    #[snafu(display("Failed to move values: {err_msg}"))]
659    MoveValues {
660        err_msg: String,
661        #[snafu(implicit)]
662        location: Location,
663    },
664
665    #[snafu(display("Failed to parse {} from utf8", name))]
666    FromUtf8 {
667        name: String,
668        #[snafu(source)]
669        error: std::string::FromUtf8Error,
670        #[snafu(implicit)]
671        location: Location,
672    },
673
674    #[snafu(display("Value not exists"))]
675    ValueNotExist {
676        #[snafu(implicit)]
677        location: Location,
678    },
679
680    #[snafu(display("Failed to get cache"))]
681    GetCache { source: Arc<Error> },
682
683    #[cfg(feature = "pg_kvbackend")]
684    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
685    PostgresExecution {
686        sql: String,
687        #[snafu(source)]
688        error: tokio_postgres::Error,
689        #[snafu(implicit)]
690        location: Location,
691    },
692
693    #[cfg(feature = "pg_kvbackend")]
694    #[snafu(display("Failed to create connection pool for Postgres"))]
695    CreatePostgresPool {
696        #[snafu(source)]
697        error: deadpool_postgres::CreatePoolError,
698        #[snafu(implicit)]
699        location: Location,
700    },
701
702    #[cfg(feature = "pg_kvbackend")]
703    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
704    GetPostgresConnection {
705        reason: String,
706        #[snafu(implicit)]
707        location: Location,
708    },
709
710    #[cfg(feature = "pg_kvbackend")]
711    #[snafu(display("Failed to {} Postgres transaction", operation))]
712    PostgresTransaction {
713        #[snafu(source)]
714        error: tokio_postgres::Error,
715        #[snafu(implicit)]
716        location: Location,
717        operation: String,
718    },
719
720    #[cfg(feature = "mysql_kvbackend")]
721    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
722    MySqlExecution {
723        sql: String,
724        #[snafu(source)]
725        error: sqlx::Error,
726        #[snafu(implicit)]
727        location: Location,
728    },
729
730    #[cfg(feature = "mysql_kvbackend")]
731    #[snafu(display("Failed to create connection pool for MySql"))]
732    CreateMySqlPool {
733        #[snafu(source)]
734        error: sqlx::Error,
735        #[snafu(implicit)]
736        location: Location,
737    },
738
739    #[cfg(feature = "mysql_kvbackend")]
740    #[snafu(display("Failed to {} MySql transaction", operation))]
741    MySqlTransaction {
742        #[snafu(source)]
743        error: sqlx::Error,
744        #[snafu(implicit)]
745        location: Location,
746        operation: String,
747    },
748
749    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
750    #[snafu(display("Rds transaction retry failed"))]
751    RdsTransactionRetryFailed {
752        #[snafu(implicit)]
753        location: Location,
754    },
755
756    #[snafu(display(
757        "Datanode table info not found, table id: {}, datanode id: {}",
758        table_id,
759        datanode_id
760    ))]
761    DatanodeTableInfoNotFound {
762        datanode_id: DatanodeId,
763        table_id: TableId,
764        #[snafu(implicit)]
765        location: Location,
766    },
767
768    #[snafu(display("Invalid topic name prefix: {}", prefix))]
769    InvalidTopicNamePrefix {
770        prefix: String,
771        #[snafu(implicit)]
772        location: Location,
773    },
774
775    #[snafu(display("Failed to parse wal options: {}", wal_options))]
776    ParseWalOptions {
777        wal_options: String,
778        #[snafu(implicit)]
779        location: Location,
780        #[snafu(source)]
781        error: serde_json::Error,
782    },
783
784    #[snafu(display("No leader found for table_id: {}", table_id))]
785    NoLeader {
786        table_id: TableId,
787        #[snafu(implicit)]
788        location: Location,
789    },
790
791    #[snafu(display(
792        "Procedure poison key already exists with a different value, key: {}, value: {}",
793        key,
794        value
795    ))]
796    ProcedurePoisonConflict {
797        key: String,
798        value: String,
799        #[snafu(implicit)]
800        location: Location,
801    },
802
803    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
804    PutPoison {
805        #[snafu(implicit)]
806        location: Location,
807        #[snafu(source)]
808        source: common_procedure::error::Error,
809    },
810
811    #[snafu(display("Failed to parse timezone"))]
812    InvalidTimeZone {
813        #[snafu(implicit)]
814        location: Location,
815        #[snafu(source)]
816        error: common_time::error::Error,
817    },
818    #[snafu(display("Invalid file path: {}", file_path))]
819    InvalidFilePath {
820        #[snafu(implicit)]
821        location: Location,
822        file_path: String,
823    },
824
825    #[snafu(display("Failed to serialize flexbuffers"))]
826    SerializeFlexbuffers {
827        #[snafu(implicit)]
828        location: Location,
829        #[snafu(source)]
830        error: flexbuffers::SerializationError,
831    },
832
833    #[snafu(display("Failed to deserialize flexbuffers"))]
834    DeserializeFlexbuffers {
835        #[snafu(implicit)]
836        location: Location,
837        #[snafu(source)]
838        error: flexbuffers::DeserializationError,
839    },
840
841    #[snafu(display("Failed to read flexbuffers"))]
842    ReadFlexbuffers {
843        #[snafu(implicit)]
844        location: Location,
845        #[snafu(source)]
846        error: flexbuffers::ReaderError,
847    },
848
849    #[snafu(display("Invalid file name: {}", reason))]
850    InvalidFileName {
851        #[snafu(implicit)]
852        location: Location,
853        reason: String,
854    },
855
856    #[snafu(display("Invalid file extension: {}", reason))]
857    InvalidFileExtension {
858        #[snafu(implicit)]
859        location: Location,
860        reason: String,
861    },
862
863    #[snafu(display("Failed to write object, file path: {}", file_path))]
864    WriteObject {
865        #[snafu(implicit)]
866        location: Location,
867        file_path: String,
868        #[snafu(source)]
869        error: object_store::Error,
870    },
871
872    #[snafu(display("Failed to read object, file path: {}", file_path))]
873    ReadObject {
874        #[snafu(implicit)]
875        location: Location,
876        file_path: String,
877        #[snafu(source)]
878        error: object_store::Error,
879    },
880}
881
882pub type Result<T> = std::result::Result<T, Error>;
883
884impl ErrorExt for Error {
885    fn status_code(&self) -> StatusCode {
886        use Error::*;
887        match self {
888            IllegalServerState { .. }
889            | EtcdTxnOpResponse { .. }
890            | EtcdFailed { .. }
891            | EtcdTxnFailed { .. }
892            | ConnectEtcd { .. }
893            | MoveValues { .. }
894            | GetCache { .. }
895            | SerializeToJson { .. }
896            | DeserializeFromJson { .. } => StatusCode::Internal,
897
898            NoLeader { .. } => StatusCode::TableUnavailable,
899            ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
900
901            Unsupported { .. } => StatusCode::Unsupported,
902            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
903
904            SerdeJson { .. }
905            | ParseOption { .. }
906            | RouteInfoCorrupted { .. }
907            | InvalidProtoMsg { .. }
908            | InvalidMetadata { .. }
909            | Unexpected { .. }
910            | TableInfoNotFound { .. }
911            | NextSequence { .. }
912            | UnexpectedSequenceValue { .. }
913            | InvalidHeartbeatResponse { .. }
914            | EncodeJson { .. }
915            | DecodeJson { .. }
916            | PayloadNotExist { .. }
917            | ConvertRawKey { .. }
918            | DecodeProto { .. }
919            | BuildTableMeta { .. }
920            | TableRouteNotFound { .. }
921            | ConvertRawTableInfo { .. }
922            | RegionOperatingRace { .. }
923            | EncodeWalOptions { .. }
924            | BuildKafkaClient { .. }
925            | BuildKafkaCtrlClient { .. }
926            | KafkaPartitionClient { .. }
927            | ResolveKafkaEndpoint { .. }
928            | ProduceRecord { .. }
929            | CreateKafkaWalTopic { .. }
930            | EmptyTopicPool { .. }
931            | UnexpectedLogicalRouteTable { .. }
932            | ProcedureOutput { .. }
933            | FromUtf8 { .. }
934            | MetadataCorruption { .. }
935            | ParseWalOptions { .. }
936            | KafkaGetOffset { .. }
937            | ReadFlexbuffers { .. }
938            | SerializeFlexbuffers { .. }
939            | DeserializeFlexbuffers { .. } => StatusCode::Unexpected,
940
941            SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
942
943            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
944
945            ProcedureNotFound { .. }
946            | InvalidViewInfo { .. }
947            | PrimaryKeyNotFound { .. }
948            | EmptyKey { .. }
949            | AlterLogicalTablesInvalidArguments { .. }
950            | CreateLogicalTablesInvalidArguments { .. }
951            | MismatchPrefix { .. }
952            | TlsConfig { .. }
953            | InvalidSetDatabaseOption { .. }
954            | InvalidUnsetDatabaseOption { .. }
955            | InvalidTopicNamePrefix { .. }
956            | InvalidTimeZone { .. }
957            | InvalidFileExtension { .. }
958            | InvalidFileName { .. }
959            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
960            InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
961
962            FlowNotFound { .. } => StatusCode::FlowNotFound,
963            FlowRouteNotFound { .. } => StatusCode::Unexpected,
964            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
965
966            ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
967            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
968
969            SubmitProcedure { source, .. }
970            | QueryProcedure { source, .. }
971            | WaitProcedure { source, .. }
972            | StartProcedureManager { source, .. }
973            | StopProcedureManager { source, .. } => source.status_code(),
974            RegisterProcedureLoader { source, .. } => source.status_code(),
975            External { source, .. } => source.status_code(),
976            ResponseExceededSizeLimit { source, .. } => source.status_code(),
977            OperateDatanode { source, .. } => source.status_code(),
978            Table { source, .. } => source.status_code(),
979            RetryLater { source, .. } => source.status_code(),
980            AbortProcedure { source, .. } => source.status_code(),
981            ConvertAlterTableRequest { source, .. } => source.status_code(),
982            PutPoison { source, .. } => source.status_code(),
983
984            ParseProcedureId { .. }
985            | InvalidNumTopics { .. }
986            | SchemaNotFound { .. }
987            | InvalidNodeInfoKey { .. }
988            | InvalidStatKey { .. }
989            | ParseNum { .. }
990            | InvalidRole { .. }
991            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
992
993            #[cfg(feature = "pg_kvbackend")]
994            PostgresExecution { .. }
995            | CreatePostgresPool { .. }
996            | GetPostgresConnection { .. }
997            | PostgresTransaction { .. } => StatusCode::Internal,
998            #[cfg(feature = "mysql_kvbackend")]
999            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1000                StatusCode::Internal
1001            }
1002            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1003            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1004            Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1005        }
1006    }
1007
1008    fn as_any(&self) -> &dyn std::any::Any {
1009        self
1010    }
1011}
1012
1013impl Error {
1014    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1015    /// Check if the error is a serialization error.
1016    pub fn is_serialization_error(&self) -> bool {
1017        match self {
1018            #[cfg(feature = "pg_kvbackend")]
1019            Error::PostgresTransaction { error, .. } => {
1020                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1021            }
1022            #[cfg(feature = "pg_kvbackend")]
1023            Error::PostgresExecution { error, .. } => {
1024                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1025            }
1026            #[cfg(feature = "mysql_kvbackend")]
1027            Error::MySqlExecution {
1028                error: sqlx::Error::Database(database_error),
1029                ..
1030            } => {
1031                matches!(
1032                    database_error.message(),
1033                    "Deadlock found when trying to get lock; try restarting transaction"
1034                        | "can't serialize access for this transaction"
1035                )
1036            }
1037            _ => false,
1038        }
1039    }
1040
1041    /// Creates a new [Error::RetryLater] error from source `err`.
1042    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1043        Error::RetryLater {
1044            source: BoxedError::new(err),
1045            clean_poisons: false,
1046        }
1047    }
1048
1049    /// Determine whether it is a retry later type through [StatusCode]
1050    pub fn is_retry_later(&self) -> bool {
1051        matches!(self, Error::RetryLater { .. })
1052    }
1053
1054    /// Determine whether it needs to clean poisons.
1055    pub fn need_clean_poisons(&self) -> bool {
1056        matches!(
1057            self,
1058            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1059        ) || matches!(
1060            self,
1061            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1062        )
1063    }
1064
1065    /// Returns true if the response exceeds the size limit.
1066    pub fn is_exceeded_size_limit(&self) -> bool {
1067        match self {
1068            Error::EtcdFailed {
1069                error: etcd_client::Error::GRpcStatus(status),
1070                ..
1071            } => status.code() == tonic::Code::OutOfRange,
1072            Error::ResponseExceededSizeLimit { .. } => true,
1073            _ => false,
1074        }
1075    }
1076}