common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::peer::Peer;
29use crate::DatanodeId;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to submit procedure"))]
108    SubmitProcedure {
109        #[snafu(implicit)]
110        location: Location,
111        source: common_procedure::Error,
112    },
113
114    #[snafu(display("Failed to query procedure"))]
115    QueryProcedure {
116        #[snafu(implicit)]
117        location: Location,
118        source: common_procedure::Error,
119    },
120
121    #[snafu(display("Procedure not found: {pid}"))]
122    ProcedureNotFound {
123        #[snafu(implicit)]
124        location: Location,
125        pid: String,
126    },
127
128    #[snafu(display("Failed to parse procedure id: {key}"))]
129    ParseProcedureId {
130        #[snafu(implicit)]
131        location: Location,
132        key: String,
133        #[snafu(source)]
134        error: common_procedure::ParseIdError,
135    },
136
137    #[snafu(display("Unsupported operation {}", operation))]
138    Unsupported {
139        operation: String,
140        #[snafu(implicit)]
141        location: Location,
142    },
143
144    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145    ProcedureStateReceiver {
146        procedure_id: ProcedureId,
147        #[snafu(implicit)]
148        location: Location,
149        source: common_procedure::Error,
150    },
151
152    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153    ProcedureStateReceiverNotFound {
154        procedure_id: ProcedureId,
155        #[snafu(implicit)]
156        location: Location,
157    },
158
159    #[snafu(display("Failed to wait procedure done"))]
160    WaitProcedure {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Failed to start procedure manager"))]
167    StartProcedureManager {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_procedure::Error,
171    },
172
173    #[snafu(display("Failed to stop procedure manager"))]
174    StopProcedureManager {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display(
181        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182    ))]
183    ProcedureOutput {
184        procedure_id: String,
185        err_msg: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191    ConvertRawTableInfo {
192        #[snafu(implicit)]
193        location: Location,
194        source: datatypes::Error,
195    },
196
197    #[snafu(display("Primary key '{key}' not found when creating region request"))]
198    PrimaryKeyNotFound {
199        key: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Failed to build table meta for table: {}", table_name))]
205    BuildTableMeta {
206        table_name: String,
207        #[snafu(source)]
208        error: table::metadata::TableMetaBuilderError,
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Table occurs error"))]
214    Table {
215        #[snafu(implicit)]
216        location: Location,
217        source: table::error::Error,
218    },
219
220    #[snafu(display("Failed to find table route for table id {}", table_id))]
221    TableRouteNotFound {
222        table_id: TableId,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Failed to decode protobuf"))]
228    DecodeProto {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: prost::DecodeError,
233    },
234
235    #[snafu(display("Failed to encode object into json"))]
236    EncodeJson {
237        #[snafu(implicit)]
238        location: Location,
239        #[snafu(source)]
240        error: JsonError,
241    },
242
243    #[snafu(display("Failed to decode object from json"))]
244    DecodeJson {
245        #[snafu(implicit)]
246        location: Location,
247        #[snafu(source)]
248        error: JsonError,
249    },
250
251    #[snafu(display("Failed to serialize to json: {}", input))]
252    SerializeToJson {
253        input: String,
254        #[snafu(source)]
255        error: serde_json::error::Error,
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to deserialize from json: {}", input))]
261    DeserializeFromJson {
262        input: String,
263        #[snafu(source)]
264        error: serde_json::error::Error,
265        #[snafu(implicit)]
266        location: Location,
267    },
268
269    #[snafu(display("Payload not exist"))]
270    PayloadNotExist {
271        #[snafu(implicit)]
272        location: Location,
273    },
274
275    #[snafu(display("Failed to send message: {err_msg}"))]
276    SendMessage {
277        err_msg: String,
278        #[snafu(implicit)]
279        location: Location,
280    },
281
282    #[snafu(display("Failed to serde json"))]
283    SerdeJson {
284        #[snafu(source)]
285        error: serde_json::error::Error,
286        #[snafu(implicit)]
287        location: Location,
288    },
289
290    #[snafu(display("Failed to parse value {} into key {}", value, key))]
291    ParseOption {
292        key: String,
293        value: String,
294        #[snafu(implicit)]
295        location: Location,
296    },
297
298    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
299    RouteInfoCorrupted {
300        err_msg: String,
301        #[snafu(implicit)]
302        location: Location,
303    },
304
305    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
306    IllegalServerState {
307        code: i32,
308        err_msg: String,
309        #[snafu(implicit)]
310        location: Location,
311    },
312
313    #[snafu(display("Failed to convert alter table request"))]
314    ConvertAlterTableRequest {
315        source: common_grpc_expr::error::Error,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Invalid protobuf message: {err_msg}"))]
321    InvalidProtoMsg {
322        err_msg: String,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Unexpected: {err_msg}"))]
328    Unexpected {
329        err_msg: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Table already exists, table: {}", table_name))]
335    TableAlreadyExists {
336        table_name: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("View already exists, view: {}", view_name))]
342    ViewAlreadyExists {
343        view_name: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("Flow already exists: {}", flow_name))]
349    FlowAlreadyExists {
350        flow_name: String,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
356    SchemaAlreadyExists {
357        catalog: String,
358        schema: String,
359        #[snafu(implicit)]
360        location: Location,
361    },
362
363    #[snafu(display("Failed to convert raw key to str"))]
364    ConvertRawKey {
365        #[snafu(implicit)]
366        location: Location,
367        #[snafu(source)]
368        error: Utf8Error,
369    },
370
371    #[snafu(display("Table not found: '{}'", table_name))]
372    TableNotFound {
373        table_name: String,
374        #[snafu(implicit)]
375        location: Location,
376    },
377
378    #[snafu(display("View not found: '{}'", view_name))]
379    ViewNotFound {
380        view_name: String,
381        #[snafu(implicit)]
382        location: Location,
383    },
384
385    #[snafu(display("Flow not found: '{}'", flow_name))]
386    FlowNotFound {
387        flow_name: String,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Flow route not found: '{}'", flow_name))]
393    FlowRouteNotFound {
394        flow_name: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("Schema nod found, schema: {}", table_schema))]
400    SchemaNotFound {
401        table_schema: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Catalog not found, catalog: {}", catalog))]
407    CatalogNotFound {
408        catalog: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Invalid metadata, err: {}", err_msg))]
414    InvalidMetadata {
415        err_msg: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Invalid view info, err: {}", err_msg))]
421    InvalidViewInfo {
422        err_msg: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Invalid flow request body: {:?}", body))]
428    InvalidFlowRequestBody {
429        body: Box<Option<api::v1::flow::flow_request::Body>>,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
435    GetKvCache { err_msg: String },
436
437    #[snafu(display("Get null from cache, key: {}", key))]
438    CacheNotGet {
439        key: String,
440        #[snafu(implicit)]
441        location: Location,
442    },
443
444    #[snafu(display("Etcd txn error: {err_msg}"))]
445    EtcdTxnOpResponse {
446        err_msg: String,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("External error"))]
452    External {
453        #[snafu(implicit)]
454        location: Location,
455        source: BoxedError,
456    },
457
458    #[snafu(display("The response exceeded size limit"))]
459    ResponseExceededSizeLimit {
460        #[snafu(implicit)]
461        location: Location,
462        source: BoxedError,
463    },
464
465    #[snafu(display("Invalid heartbeat response"))]
466    InvalidHeartbeatResponse {
467        #[snafu(implicit)]
468        location: Location,
469    },
470
471    #[snafu(display("Failed to operate on datanode: {}", peer))]
472    OperateDatanode {
473        #[snafu(implicit)]
474        location: Location,
475        peer: Peer,
476        source: BoxedError,
477    },
478
479    #[snafu(display("Retry later"))]
480    RetryLater {
481        source: BoxedError,
482        clean_poisons: bool,
483    },
484
485    #[snafu(display("Abort procedure"))]
486    AbortProcedure {
487        #[snafu(implicit)]
488        location: Location,
489        source: BoxedError,
490        clean_poisons: bool,
491    },
492
493    #[snafu(display(
494        "Failed to encode a wal options to json string, wal_options: {:?}",
495        wal_options
496    ))]
497    EncodeWalOptions {
498        wal_options: WalOptions,
499        #[snafu(source)]
500        error: serde_json::Error,
501        #[snafu(implicit)]
502        location: Location,
503    },
504
505    #[snafu(display("Invalid number of topics {}", num_topics))]
506    InvalidNumTopics {
507        num_topics: usize,
508        #[snafu(implicit)]
509        location: Location,
510    },
511
512    #[snafu(display(
513        "Failed to build a Kafka client, broker endpoints: {:?}",
514        broker_endpoints
515    ))]
516    BuildKafkaClient {
517        broker_endpoints: Vec<String>,
518        #[snafu(implicit)]
519        location: Location,
520        #[snafu(source)]
521        error: rskafka::client::error::Error,
522    },
523
524    #[snafu(display("Failed to create TLS Config"))]
525    TlsConfig {
526        #[snafu(implicit)]
527        location: Location,
528        source: common_wal::error::Error,
529    },
530
531    #[snafu(display("Failed to resolve Kafka broker endpoint."))]
532    ResolveKafkaEndpoint { source: common_wal::error::Error },
533
534    #[snafu(display("Failed to build a Kafka controller client"))]
535    BuildKafkaCtrlClient {
536        #[snafu(implicit)]
537        location: Location,
538        #[snafu(source)]
539        error: rskafka::client::error::Error,
540    },
541
542    #[snafu(display(
543        "Failed to get a Kafka partition client, topic: {}, partition: {}",
544        topic,
545        partition
546    ))]
547    KafkaPartitionClient {
548        topic: String,
549        partition: i32,
550        #[snafu(implicit)]
551        location: Location,
552        #[snafu(source)]
553        error: rskafka::client::error::Error,
554    },
555
556    #[snafu(display(
557        "Failed to get offset from Kafka, topic: {}, partition: {}",
558        topic,
559        partition
560    ))]
561    KafkaGetOffset {
562        topic: String,
563        partition: i32,
564        #[snafu(implicit)]
565        location: Location,
566        #[snafu(source)]
567        error: rskafka::client::error::Error,
568    },
569
570    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
571    ProduceRecord {
572        topic: String,
573        #[snafu(implicit)]
574        location: Location,
575        #[snafu(source)]
576        error: rskafka::client::error::Error,
577    },
578
579    #[snafu(display("Failed to create a Kafka wal topic"))]
580    CreateKafkaWalTopic {
581        #[snafu(implicit)]
582        location: Location,
583        #[snafu(source)]
584        error: rskafka::client::error::Error,
585    },
586
587    #[snafu(display("The topic pool is empty"))]
588    EmptyTopicPool {
589        #[snafu(implicit)]
590        location: Location,
591    },
592
593    #[snafu(display("Unexpected table route type: {}", err_msg))]
594    UnexpectedLogicalRouteTable {
595        #[snafu(implicit)]
596        location: Location,
597        err_msg: String,
598    },
599
600    #[snafu(display("The tasks of {} cannot be empty", name))]
601    EmptyDdlTasks {
602        name: String,
603        #[snafu(implicit)]
604        location: Location,
605    },
606
607    #[snafu(display("Metadata corruption: {}", err_msg))]
608    MetadataCorruption {
609        err_msg: String,
610        #[snafu(implicit)]
611        location: Location,
612    },
613
614    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
615    AlterLogicalTablesInvalidArguments {
616        err_msg: String,
617        #[snafu(implicit)]
618        location: Location,
619    },
620
621    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
622    CreateLogicalTablesInvalidArguments {
623        err_msg: String,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Invalid node info key: {}", key))]
629    InvalidNodeInfoKey {
630        key: String,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Invalid node stat key: {}", key))]
636    InvalidStatKey {
637        key: String,
638        #[snafu(implicit)]
639        location: Location,
640    },
641
642    #[snafu(display("Failed to parse number: {}", err_msg))]
643    ParseNum {
644        err_msg: String,
645        #[snafu(source)]
646        error: std::num::ParseIntError,
647        #[snafu(implicit)]
648        location: Location,
649    },
650
651    #[snafu(display("Invalid role: {}", role))]
652    InvalidRole {
653        role: i32,
654        #[snafu(implicit)]
655        location: Location,
656    },
657
658    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
659    InvalidSetDatabaseOption {
660        key: String,
661        value: String,
662        #[snafu(implicit)]
663        location: Location,
664    },
665
666    #[snafu(display("Invalid unset database option, key: {}", key))]
667    InvalidUnsetDatabaseOption {
668        key: String,
669        #[snafu(implicit)]
670        location: Location,
671    },
672
673    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
674    MismatchPrefix {
675        prefix: String,
676        key: String,
677        #[snafu(implicit)]
678        location: Location,
679    },
680
681    #[snafu(display("Failed to move values: {err_msg}"))]
682    MoveValues {
683        err_msg: String,
684        #[snafu(implicit)]
685        location: Location,
686    },
687
688    #[snafu(display("Failed to parse {} from utf8", name))]
689    FromUtf8 {
690        name: String,
691        #[snafu(source)]
692        error: std::string::FromUtf8Error,
693        #[snafu(implicit)]
694        location: Location,
695    },
696
697    #[snafu(display("Value not exists"))]
698    ValueNotExist {
699        #[snafu(implicit)]
700        location: Location,
701    },
702
703    #[snafu(display("Failed to get cache"))]
704    GetCache { source: Arc<Error> },
705
706    #[cfg(feature = "pg_kvbackend")]
707    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
708    PostgresExecution {
709        sql: String,
710        #[snafu(source)]
711        error: tokio_postgres::Error,
712        #[snafu(implicit)]
713        location: Location,
714    },
715
716    #[cfg(feature = "pg_kvbackend")]
717    #[snafu(display("Failed to create connection pool for Postgres"))]
718    CreatePostgresPool {
719        #[snafu(source)]
720        error: deadpool_postgres::CreatePoolError,
721        #[snafu(implicit)]
722        location: Location,
723    },
724
725    #[cfg(feature = "pg_kvbackend")]
726    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
727    GetPostgresConnection {
728        reason: String,
729        #[snafu(implicit)]
730        location: Location,
731    },
732
733    #[cfg(feature = "pg_kvbackend")]
734    #[snafu(display("Failed to {} Postgres transaction", operation))]
735    PostgresTransaction {
736        #[snafu(source)]
737        error: tokio_postgres::Error,
738        #[snafu(implicit)]
739        location: Location,
740        operation: String,
741    },
742
743    #[cfg(feature = "pg_kvbackend")]
744    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
745    PostgresTlsConfig {
746        reason: String,
747        #[snafu(implicit)]
748        location: Location,
749    },
750
751    #[cfg(feature = "pg_kvbackend")]
752    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
753    LoadTlsCertificate {
754        path: String,
755        #[snafu(source)]
756        error: std::io::Error,
757        #[snafu(implicit)]
758        location: Location,
759    },
760
761    #[cfg(feature = "pg_kvbackend")]
762    #[snafu(display("Invalid TLS configuration: {}", reason))]
763    InvalidTlsConfig {
764        reason: String,
765        #[snafu(implicit)]
766        location: Location,
767    },
768
769    #[cfg(feature = "mysql_kvbackend")]
770    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
771    MySqlExecution {
772        sql: String,
773        #[snafu(source)]
774        error: sqlx::Error,
775        #[snafu(implicit)]
776        location: Location,
777    },
778
779    #[cfg(feature = "mysql_kvbackend")]
780    #[snafu(display("Failed to create connection pool for MySql"))]
781    CreateMySqlPool {
782        #[snafu(source)]
783        error: sqlx::Error,
784        #[snafu(implicit)]
785        location: Location,
786    },
787
788    #[cfg(feature = "mysql_kvbackend")]
789    #[snafu(display("Failed to {} MySql transaction", operation))]
790    MySqlTransaction {
791        #[snafu(source)]
792        error: sqlx::Error,
793        #[snafu(implicit)]
794        location: Location,
795        operation: String,
796    },
797
798    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
799    #[snafu(display("Rds transaction retry failed"))]
800    RdsTransactionRetryFailed {
801        #[snafu(implicit)]
802        location: Location,
803    },
804
805    #[snafu(display(
806        "Datanode table info not found, table id: {}, datanode id: {}",
807        table_id,
808        datanode_id
809    ))]
810    DatanodeTableInfoNotFound {
811        datanode_id: DatanodeId,
812        table_id: TableId,
813        #[snafu(implicit)]
814        location: Location,
815    },
816
817    #[snafu(display("Invalid topic name prefix: {}", prefix))]
818    InvalidTopicNamePrefix {
819        prefix: String,
820        #[snafu(implicit)]
821        location: Location,
822    },
823
824    #[snafu(display("Failed to parse wal options: {}", wal_options))]
825    ParseWalOptions {
826        wal_options: String,
827        #[snafu(implicit)]
828        location: Location,
829        #[snafu(source)]
830        error: serde_json::Error,
831    },
832
833    #[snafu(display("No leader found for table_id: {}", table_id))]
834    NoLeader {
835        table_id: TableId,
836        #[snafu(implicit)]
837        location: Location,
838    },
839
840    #[snafu(display(
841        "Procedure poison key already exists with a different value, key: {}, value: {}",
842        key,
843        value
844    ))]
845    ProcedurePoisonConflict {
846        key: String,
847        value: String,
848        #[snafu(implicit)]
849        location: Location,
850    },
851
852    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
853    PutPoison {
854        #[snafu(implicit)]
855        location: Location,
856        #[snafu(source)]
857        source: common_procedure::error::Error,
858    },
859
860    #[snafu(display("Failed to parse timezone"))]
861    InvalidTimeZone {
862        #[snafu(implicit)]
863        location: Location,
864        #[snafu(source)]
865        error: common_time::error::Error,
866    },
867    #[snafu(display("Invalid file path: {}", file_path))]
868    InvalidFilePath {
869        #[snafu(implicit)]
870        location: Location,
871        file_path: String,
872    },
873
874    #[snafu(display("Failed to serialize flexbuffers"))]
875    SerializeFlexbuffers {
876        #[snafu(implicit)]
877        location: Location,
878        #[snafu(source)]
879        error: flexbuffers::SerializationError,
880    },
881
882    #[snafu(display("Failed to deserialize flexbuffers"))]
883    DeserializeFlexbuffers {
884        #[snafu(implicit)]
885        location: Location,
886        #[snafu(source)]
887        error: flexbuffers::DeserializationError,
888    },
889
890    #[snafu(display("Failed to read flexbuffers"))]
891    ReadFlexbuffers {
892        #[snafu(implicit)]
893        location: Location,
894        #[snafu(source)]
895        error: flexbuffers::ReaderError,
896    },
897
898    #[snafu(display("Invalid file name: {}", reason))]
899    InvalidFileName {
900        #[snafu(implicit)]
901        location: Location,
902        reason: String,
903    },
904
905    #[snafu(display("Invalid file extension: {}", reason))]
906    InvalidFileExtension {
907        #[snafu(implicit)]
908        location: Location,
909        reason: String,
910    },
911
912    #[snafu(display("Failed to write object, file path: {}", file_path))]
913    WriteObject {
914        #[snafu(implicit)]
915        location: Location,
916        file_path: String,
917        #[snafu(source)]
918        error: object_store::Error,
919    },
920
921    #[snafu(display("Failed to read object, file path: {}", file_path))]
922    ReadObject {
923        #[snafu(implicit)]
924        location: Location,
925        file_path: String,
926        #[snafu(source)]
927        error: object_store::Error,
928    },
929
930    #[snafu(display("Missing column ids"))]
931    MissingColumnIds {
932        #[snafu(implicit)]
933        location: Location,
934    },
935
936    #[snafu(display(
937        "Missing column in column metadata: {}, table: {}, table_id: {}",
938        column_name,
939        table_name,
940        table_id,
941    ))]
942    MissingColumnInColumnMetadata {
943        column_name: String,
944        #[snafu(implicit)]
945        location: Location,
946        table_name: String,
947        table_id: TableId,
948    },
949
950    #[snafu(display(
951        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
952        column_name,
953        column_id,
954        table_name,
955        table_id,
956    ))]
957    MismatchColumnId {
958        column_name: String,
959        column_id: u32,
960        #[snafu(implicit)]
961        location: Location,
962        table_name: String,
963        table_id: TableId,
964    },
965
966    #[snafu(display("Failed to convert column def, column: {}", column))]
967    ConvertColumnDef {
968        column: String,
969        #[snafu(implicit)]
970        location: Location,
971        source: api::error::Error,
972    },
973
974    #[snafu(display("Failed to convert time ranges"))]
975    ConvertTimeRanges {
976        #[snafu(implicit)]
977        location: Location,
978        source: api::error::Error,
979    },
980
981    #[snafu(display(
982        "Column metadata inconsistencies found in table: {}, table_id: {}",
983        table_name,
984        table_id
985    ))]
986    ColumnMetadataConflicts {
987        table_name: String,
988        table_id: TableId,
989    },
990}
991
992pub type Result<T> = std::result::Result<T, Error>;
993
994impl ErrorExt for Error {
995    fn status_code(&self) -> StatusCode {
996        use Error::*;
997        match self {
998            IllegalServerState { .. }
999            | EtcdTxnOpResponse { .. }
1000            | EtcdFailed { .. }
1001            | EtcdTxnFailed { .. }
1002            | ConnectEtcd { .. }
1003            | MoveValues { .. }
1004            | GetCache { .. }
1005            | SerializeToJson { .. }
1006            | DeserializeFromJson { .. } => StatusCode::Internal,
1007
1008            NoLeader { .. } => StatusCode::TableUnavailable,
1009            ValueNotExist { .. }
1010            | ProcedurePoisonConflict { .. }
1011            | ProcedureStateReceiverNotFound { .. }
1012            | MissingColumnIds { .. }
1013            | MissingColumnInColumnMetadata { .. }
1014            | MismatchColumnId { .. }
1015            | ColumnMetadataConflicts { .. } => StatusCode::Unexpected,
1016
1017            Unsupported { .. } => StatusCode::Unsupported,
1018            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1019
1020            SerdeJson { .. }
1021            | ParseOption { .. }
1022            | RouteInfoCorrupted { .. }
1023            | InvalidProtoMsg { .. }
1024            | InvalidMetadata { .. }
1025            | Unexpected { .. }
1026            | TableInfoNotFound { .. }
1027            | NextSequence { .. }
1028            | UnexpectedSequenceValue { .. }
1029            | InvalidHeartbeatResponse { .. }
1030            | EncodeJson { .. }
1031            | DecodeJson { .. }
1032            | PayloadNotExist { .. }
1033            | ConvertRawKey { .. }
1034            | DecodeProto { .. }
1035            | BuildTableMeta { .. }
1036            | TableRouteNotFound { .. }
1037            | ConvertRawTableInfo { .. }
1038            | RegionOperatingRace { .. }
1039            | EncodeWalOptions { .. }
1040            | BuildKafkaClient { .. }
1041            | BuildKafkaCtrlClient { .. }
1042            | KafkaPartitionClient { .. }
1043            | ResolveKafkaEndpoint { .. }
1044            | ProduceRecord { .. }
1045            | CreateKafkaWalTopic { .. }
1046            | EmptyTopicPool { .. }
1047            | UnexpectedLogicalRouteTable { .. }
1048            | ProcedureOutput { .. }
1049            | FromUtf8 { .. }
1050            | MetadataCorruption { .. }
1051            | ParseWalOptions { .. }
1052            | KafkaGetOffset { .. }
1053            | ReadFlexbuffers { .. }
1054            | SerializeFlexbuffers { .. }
1055            | DeserializeFlexbuffers { .. }
1056            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1057
1058            SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1059
1060            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1061
1062            ProcedureNotFound { .. }
1063            | InvalidViewInfo { .. }
1064            | PrimaryKeyNotFound { .. }
1065            | EmptyKey { .. }
1066            | AlterLogicalTablesInvalidArguments { .. }
1067            | CreateLogicalTablesInvalidArguments { .. }
1068            | MismatchPrefix { .. }
1069            | TlsConfig { .. }
1070            | InvalidSetDatabaseOption { .. }
1071            | InvalidUnsetDatabaseOption { .. }
1072            | InvalidTopicNamePrefix { .. }
1073            | InvalidTimeZone { .. }
1074            | InvalidFileExtension { .. }
1075            | InvalidFileName { .. }
1076            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1077            InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
1078
1079            FlowNotFound { .. } => StatusCode::FlowNotFound,
1080            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1081            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1082
1083            ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
1084            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1085
1086            SubmitProcedure { source, .. }
1087            | QueryProcedure { source, .. }
1088            | WaitProcedure { source, .. }
1089            | StartProcedureManager { source, .. }
1090            | StopProcedureManager { source, .. } => source.status_code(),
1091            RegisterProcedureLoader { source, .. } => source.status_code(),
1092            External { source, .. } => source.status_code(),
1093            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1094            OperateDatanode { source, .. } => source.status_code(),
1095            Table { source, .. } => source.status_code(),
1096            RetryLater { source, .. } => source.status_code(),
1097            AbortProcedure { source, .. } => source.status_code(),
1098            ConvertAlterTableRequest { source, .. } => source.status_code(),
1099            PutPoison { source, .. } => source.status_code(),
1100            ConvertColumnDef { source, .. } => source.status_code(),
1101            ProcedureStateReceiver { source, .. } => source.status_code(),
1102
1103            ParseProcedureId { .. }
1104            | InvalidNumTopics { .. }
1105            | SchemaNotFound { .. }
1106            | CatalogNotFound { .. }
1107            | InvalidNodeInfoKey { .. }
1108            | InvalidStatKey { .. }
1109            | ParseNum { .. }
1110            | InvalidRole { .. }
1111            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1112
1113            #[cfg(feature = "pg_kvbackend")]
1114            PostgresExecution { .. }
1115            | CreatePostgresPool { .. }
1116            | GetPostgresConnection { .. }
1117            | PostgresTransaction { .. }
1118            | PostgresTlsConfig { .. }
1119            | LoadTlsCertificate { .. }
1120            | InvalidTlsConfig { .. } => StatusCode::Internal,
1121            #[cfg(feature = "mysql_kvbackend")]
1122            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1123                StatusCode::Internal
1124            }
1125            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1126            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1127            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1128        }
1129    }
1130
1131    fn as_any(&self) -> &dyn std::any::Any {
1132        self
1133    }
1134}
1135
1136impl Error {
1137    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1138    /// Check if the error is a serialization error.
1139    pub fn is_serialization_error(&self) -> bool {
1140        match self {
1141            #[cfg(feature = "pg_kvbackend")]
1142            Error::PostgresTransaction { error, .. } => {
1143                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1144            }
1145            #[cfg(feature = "pg_kvbackend")]
1146            Error::PostgresExecution { error, .. } => {
1147                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1148            }
1149            #[cfg(feature = "mysql_kvbackend")]
1150            Error::MySqlExecution {
1151                error: sqlx::Error::Database(database_error),
1152                ..
1153            } => {
1154                matches!(
1155                    database_error.message(),
1156                    "Deadlock found when trying to get lock; try restarting transaction"
1157                        | "can't serialize access for this transaction"
1158                )
1159            }
1160            _ => false,
1161        }
1162    }
1163
1164    /// Creates a new [Error::RetryLater] error from source `err`.
1165    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1166        Error::RetryLater {
1167            source: BoxedError::new(err),
1168            clean_poisons: false,
1169        }
1170    }
1171
1172    /// Determine whether it is a retry later type through [StatusCode]
1173    pub fn is_retry_later(&self) -> bool {
1174        matches!(self, Error::RetryLater { .. })
1175    }
1176
1177    /// Determine whether it needs to clean poisons.
1178    pub fn need_clean_poisons(&self) -> bool {
1179        matches!(
1180            self,
1181            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1182        ) || matches!(
1183            self,
1184            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1185        )
1186    }
1187
1188    /// Returns true if the response exceeds the size limit.
1189    pub fn is_exceeded_size_limit(&self) -> bool {
1190        match self {
1191            Error::EtcdFailed {
1192                error: etcd_client::Error::GRpcStatus(status),
1193                ..
1194            } => status.code() == tonic::Code::OutOfRange,
1195            Error::ResponseExceededSizeLimit { .. } => true,
1196            _ => false,
1197        }
1198    }
1199}