common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to submit procedure"))]
108    SubmitProcedure {
109        #[snafu(implicit)]
110        location: Location,
111        source: common_procedure::Error,
112    },
113
114    #[snafu(display("Failed to query procedure"))]
115    QueryProcedure {
116        #[snafu(implicit)]
117        location: Location,
118        source: common_procedure::Error,
119    },
120
121    #[snafu(display("Procedure not found: {pid}"))]
122    ProcedureNotFound {
123        #[snafu(implicit)]
124        location: Location,
125        pid: String,
126    },
127
128    #[snafu(display("Failed to parse procedure id: {key}"))]
129    ParseProcedureId {
130        #[snafu(implicit)]
131        location: Location,
132        key: String,
133        #[snafu(source)]
134        error: common_procedure::ParseIdError,
135    },
136
137    #[snafu(display("Unsupported operation {}", operation))]
138    Unsupported {
139        operation: String,
140        #[snafu(implicit)]
141        location: Location,
142    },
143
144    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145    ProcedureStateReceiver {
146        procedure_id: ProcedureId,
147        #[snafu(implicit)]
148        location: Location,
149        source: common_procedure::Error,
150    },
151
152    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153    ProcedureStateReceiverNotFound {
154        procedure_id: ProcedureId,
155        #[snafu(implicit)]
156        location: Location,
157    },
158
159    #[snafu(display("Failed to wait procedure done"))]
160    WaitProcedure {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Failed to start procedure manager"))]
167    StartProcedureManager {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_procedure::Error,
171    },
172
173    #[snafu(display("Failed to stop procedure manager"))]
174    StopProcedureManager {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display(
181        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182    ))]
183    ProcedureOutput {
184        procedure_id: String,
185        err_msg: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191    ConvertRawTableInfo {
192        #[snafu(implicit)]
193        location: Location,
194        source: datatypes::Error,
195    },
196
197    #[snafu(display("Primary key '{key}' not found when creating region request"))]
198    PrimaryKeyNotFound {
199        key: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Failed to build table meta for table: {}", table_name))]
205    BuildTableMeta {
206        table_name: String,
207        #[snafu(source)]
208        error: table::metadata::TableMetaBuilderError,
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Table occurs error"))]
214    Table {
215        #[snafu(implicit)]
216        location: Location,
217        source: table::error::Error,
218    },
219
220    #[snafu(display("Failed to find table route for table id {}", table_id))]
221    TableRouteNotFound {
222        table_id: TableId,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Failed to decode protobuf"))]
228    DecodeProto {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: prost::DecodeError,
233    },
234
235    #[snafu(display("Failed to encode object into json"))]
236    EncodeJson {
237        #[snafu(implicit)]
238        location: Location,
239        #[snafu(source)]
240        error: JsonError,
241    },
242
243    #[snafu(display("Failed to decode object from json"))]
244    DecodeJson {
245        #[snafu(implicit)]
246        location: Location,
247        #[snafu(source)]
248        error: JsonError,
249    },
250
251    #[snafu(display("Failed to serialize to json: {}", input))]
252    SerializeToJson {
253        input: String,
254        #[snafu(source)]
255        error: serde_json::error::Error,
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to deserialize from json: {}", input))]
261    DeserializeFromJson {
262        input: String,
263        #[snafu(source)]
264        error: serde_json::error::Error,
265        #[snafu(implicit)]
266        location: Location,
267    },
268
269    #[snafu(display("Payload not exist"))]
270    PayloadNotExist {
271        #[snafu(implicit)]
272        location: Location,
273    },
274
275    #[snafu(display("Failed to serde json"))]
276    SerdeJson {
277        #[snafu(source)]
278        error: serde_json::error::Error,
279        #[snafu(implicit)]
280        location: Location,
281    },
282
283    #[snafu(display("Failed to parse value {} into key {}", value, key))]
284    ParseOption {
285        key: String,
286        value: String,
287        #[snafu(implicit)]
288        location: Location,
289    },
290
291    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
292    RouteInfoCorrupted {
293        err_msg: String,
294        #[snafu(implicit)]
295        location: Location,
296    },
297
298    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
299    IllegalServerState {
300        code: i32,
301        err_msg: String,
302        #[snafu(implicit)]
303        location: Location,
304    },
305
306    #[snafu(display("Failed to convert alter table request"))]
307    ConvertAlterTableRequest {
308        source: common_grpc_expr::error::Error,
309        #[snafu(implicit)]
310        location: Location,
311    },
312
313    #[snafu(display("Invalid protobuf message: {err_msg}"))]
314    InvalidProtoMsg {
315        err_msg: String,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Unexpected: {err_msg}"))]
321    Unexpected {
322        err_msg: String,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Table already exists, table: {}", table_name))]
328    TableAlreadyExists {
329        table_name: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("View already exists, view: {}", view_name))]
335    ViewAlreadyExists {
336        view_name: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("Flow already exists: {}", flow_name))]
342    FlowAlreadyExists {
343        flow_name: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
349    SchemaAlreadyExists {
350        catalog: String,
351        schema: String,
352        #[snafu(implicit)]
353        location: Location,
354    },
355
356    #[snafu(display("Failed to convert raw key to str"))]
357    ConvertRawKey {
358        #[snafu(implicit)]
359        location: Location,
360        #[snafu(source)]
361        error: Utf8Error,
362    },
363
364    #[snafu(display("Table not found: '{}'", table_name))]
365    TableNotFound {
366        table_name: String,
367        #[snafu(implicit)]
368        location: Location,
369    },
370
371    #[snafu(display("Region not found: {}", region_id))]
372    RegionNotFound {
373        region_id: RegionId,
374        #[snafu(implicit)]
375        location: Location,
376    },
377
378    #[snafu(display("View not found: '{}'", view_name))]
379    ViewNotFound {
380        view_name: String,
381        #[snafu(implicit)]
382        location: Location,
383    },
384
385    #[snafu(display("Flow not found: '{}'", flow_name))]
386    FlowNotFound {
387        flow_name: String,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Flow route not found: '{}'", flow_name))]
393    FlowRouteNotFound {
394        flow_name: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("Schema nod found, schema: {}", table_schema))]
400    SchemaNotFound {
401        table_schema: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Catalog not found, catalog: {}", catalog))]
407    CatalogNotFound {
408        catalog: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Invalid metadata, err: {}", err_msg))]
414    InvalidMetadata {
415        err_msg: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Invalid view info, err: {}", err_msg))]
421    InvalidViewInfo {
422        err_msg: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Invalid flow request body: {:?}", body))]
428    InvalidFlowRequestBody {
429        body: Box<Option<api::v1::flow::flow_request::Body>>,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
435    GetKvCache { err_msg: String },
436
437    #[snafu(display("Get null from cache, key: {}", key))]
438    CacheNotGet {
439        key: String,
440        #[snafu(implicit)]
441        location: Location,
442    },
443
444    #[snafu(display("Etcd txn error: {err_msg}"))]
445    EtcdTxnOpResponse {
446        err_msg: String,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("External error"))]
452    External {
453        #[snafu(implicit)]
454        location: Location,
455        source: BoxedError,
456    },
457
458    #[snafu(display("The response exceeded size limit"))]
459    ResponseExceededSizeLimit {
460        #[snafu(implicit)]
461        location: Location,
462        source: BoxedError,
463    },
464
465    #[snafu(display("Invalid heartbeat response"))]
466    InvalidHeartbeatResponse {
467        #[snafu(implicit)]
468        location: Location,
469    },
470
471    #[snafu(display("Failed to operate on datanode: {}", peer))]
472    OperateDatanode {
473        #[snafu(implicit)]
474        location: Location,
475        peer: Peer,
476        source: BoxedError,
477    },
478
479    #[snafu(display("Retry later"))]
480    RetryLater {
481        source: BoxedError,
482        clean_poisons: bool,
483    },
484
485    #[snafu(display("Abort procedure"))]
486    AbortProcedure {
487        #[snafu(implicit)]
488        location: Location,
489        source: BoxedError,
490        clean_poisons: bool,
491    },
492
493    #[snafu(display(
494        "Failed to encode a wal options to json string, wal_options: {:?}",
495        wal_options
496    ))]
497    EncodeWalOptions {
498        wal_options: WalOptions,
499        #[snafu(source)]
500        error: serde_json::Error,
501        #[snafu(implicit)]
502        location: Location,
503    },
504
505    #[snafu(display("Invalid number of topics {}", num_topics))]
506    InvalidNumTopics {
507        num_topics: usize,
508        #[snafu(implicit)]
509        location: Location,
510    },
511
512    #[snafu(display(
513        "Failed to build a Kafka client, broker endpoints: {:?}",
514        broker_endpoints
515    ))]
516    BuildKafkaClient {
517        broker_endpoints: Vec<String>,
518        #[snafu(implicit)]
519        location: Location,
520        #[snafu(source)]
521        error: rskafka::client::error::Error,
522    },
523
524    #[snafu(display("Failed to create TLS Config"))]
525    TlsConfig {
526        #[snafu(implicit)]
527        location: Location,
528        source: common_wal::error::Error,
529    },
530
531    #[snafu(display("Failed to build a Kafka controller client"))]
532    BuildKafkaCtrlClient {
533        #[snafu(implicit)]
534        location: Location,
535        #[snafu(source)]
536        error: rskafka::client::error::Error,
537    },
538
539    #[snafu(display(
540        "Failed to get a Kafka partition client, topic: {}, partition: {}",
541        topic,
542        partition
543    ))]
544    KafkaPartitionClient {
545        topic: String,
546        partition: i32,
547        #[snafu(implicit)]
548        location: Location,
549        #[snafu(source)]
550        error: rskafka::client::error::Error,
551    },
552
553    #[snafu(display(
554        "Failed to get offset from Kafka, topic: {}, partition: {}",
555        topic,
556        partition
557    ))]
558    KafkaGetOffset {
559        topic: String,
560        partition: i32,
561        #[snafu(implicit)]
562        location: Location,
563        #[snafu(source)]
564        error: rskafka::client::error::Error,
565    },
566
567    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
568    ProduceRecord {
569        topic: String,
570        #[snafu(implicit)]
571        location: Location,
572        #[snafu(source)]
573        error: rskafka::client::error::Error,
574    },
575
576    #[snafu(display("Failed to create a Kafka wal topic"))]
577    CreateKafkaWalTopic {
578        #[snafu(implicit)]
579        location: Location,
580        #[snafu(source)]
581        error: rskafka::client::error::Error,
582    },
583
584    #[snafu(display("The topic pool is empty"))]
585    EmptyTopicPool {
586        #[snafu(implicit)]
587        location: Location,
588    },
589
590    #[snafu(display("Unexpected table route type: {}", err_msg))]
591    UnexpectedLogicalRouteTable {
592        #[snafu(implicit)]
593        location: Location,
594        err_msg: String,
595    },
596
597    #[snafu(display("The tasks of {} cannot be empty", name))]
598    EmptyDdlTasks {
599        name: String,
600        #[snafu(implicit)]
601        location: Location,
602    },
603
604    #[snafu(display("Metadata corruption: {}", err_msg))]
605    MetadataCorruption {
606        err_msg: String,
607        #[snafu(implicit)]
608        location: Location,
609    },
610
611    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
612    AlterLogicalTablesInvalidArguments {
613        err_msg: String,
614        #[snafu(implicit)]
615        location: Location,
616    },
617
618    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
619    CreateLogicalTablesInvalidArguments {
620        err_msg: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Invalid node info key: {}", key))]
626    InvalidNodeInfoKey {
627        key: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Invalid node stat key: {}", key))]
633    InvalidStatKey {
634        key: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Failed to parse number: {}", err_msg))]
640    ParseNum {
641        err_msg: String,
642        #[snafu(source)]
643        error: std::num::ParseIntError,
644        #[snafu(implicit)]
645        location: Location,
646    },
647
648    #[snafu(display("Invalid role: {}", role))]
649    InvalidRole {
650        role: i32,
651        #[snafu(implicit)]
652        location: Location,
653    },
654
655    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
656    InvalidSetDatabaseOption {
657        key: String,
658        value: String,
659        #[snafu(implicit)]
660        location: Location,
661    },
662
663    #[snafu(display("Invalid unset database option, key: {}", key))]
664    InvalidUnsetDatabaseOption {
665        key: String,
666        #[snafu(implicit)]
667        location: Location,
668    },
669
670    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
671    MismatchPrefix {
672        prefix: String,
673        key: String,
674        #[snafu(implicit)]
675        location: Location,
676    },
677
678    #[snafu(display("Failed to move values: {err_msg}"))]
679    MoveValues {
680        err_msg: String,
681        #[snafu(implicit)]
682        location: Location,
683    },
684
685    #[snafu(display("Failed to parse {} from utf8", name))]
686    FromUtf8 {
687        name: String,
688        #[snafu(source)]
689        error: std::string::FromUtf8Error,
690        #[snafu(implicit)]
691        location: Location,
692    },
693
694    #[snafu(display("Value not exists"))]
695    ValueNotExist {
696        #[snafu(implicit)]
697        location: Location,
698    },
699
700    #[snafu(display("Failed to get cache"))]
701    GetCache { source: Arc<Error> },
702
703    #[cfg(feature = "pg_kvbackend")]
704    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
705    PostgresExecution {
706        sql: String,
707        #[snafu(source)]
708        error: tokio_postgres::Error,
709        #[snafu(implicit)]
710        location: Location,
711    },
712
713    #[cfg(feature = "pg_kvbackend")]
714    #[snafu(display("Failed to create connection pool for Postgres"))]
715    CreatePostgresPool {
716        #[snafu(source)]
717        error: deadpool_postgres::CreatePoolError,
718        #[snafu(implicit)]
719        location: Location,
720    },
721
722    #[cfg(feature = "pg_kvbackend")]
723    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
724    GetPostgresConnection {
725        reason: String,
726        #[snafu(implicit)]
727        location: Location,
728    },
729
730    #[cfg(feature = "pg_kvbackend")]
731    #[snafu(display("Failed to {} Postgres transaction", operation))]
732    PostgresTransaction {
733        #[snafu(source)]
734        error: tokio_postgres::Error,
735        #[snafu(implicit)]
736        location: Location,
737        operation: String,
738    },
739
740    #[cfg(feature = "pg_kvbackend")]
741    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
742    PostgresTlsConfig {
743        reason: String,
744        #[snafu(implicit)]
745        location: Location,
746    },
747
748    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
749    LoadTlsCertificate {
750        path: String,
751        #[snafu(source)]
752        error: std::io::Error,
753        #[snafu(implicit)]
754        location: Location,
755    },
756
757    #[cfg(feature = "pg_kvbackend")]
758    #[snafu(display("Invalid TLS configuration: {}", reason))]
759    InvalidTlsConfig {
760        reason: String,
761        #[snafu(implicit)]
762        location: Location,
763    },
764
765    #[cfg(feature = "mysql_kvbackend")]
766    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
767    MySqlExecution {
768        sql: String,
769        #[snafu(source)]
770        error: sqlx::Error,
771        #[snafu(implicit)]
772        location: Location,
773    },
774
775    #[cfg(feature = "mysql_kvbackend")]
776    #[snafu(display("Failed to create connection pool for MySql"))]
777    CreateMySqlPool {
778        #[snafu(source)]
779        error: sqlx::Error,
780        #[snafu(implicit)]
781        location: Location,
782    },
783
784    #[cfg(feature = "mysql_kvbackend")]
785    #[snafu(display("Failed to {} MySql transaction", operation))]
786    MySqlTransaction {
787        #[snafu(source)]
788        error: sqlx::Error,
789        #[snafu(implicit)]
790        location: Location,
791        operation: String,
792    },
793
794    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
795    #[snafu(display("Rds transaction retry failed"))]
796    RdsTransactionRetryFailed {
797        #[snafu(implicit)]
798        location: Location,
799    },
800
801    #[snafu(display(
802        "Datanode table info not found, table id: {}, datanode id: {}",
803        table_id,
804        datanode_id
805    ))]
806    DatanodeTableInfoNotFound {
807        datanode_id: DatanodeId,
808        table_id: TableId,
809        #[snafu(implicit)]
810        location: Location,
811    },
812
813    #[snafu(display("Invalid topic name prefix: {}", prefix))]
814    InvalidTopicNamePrefix {
815        prefix: String,
816        #[snafu(implicit)]
817        location: Location,
818    },
819
820    #[snafu(display("Failed to parse wal options: {}", wal_options))]
821    ParseWalOptions {
822        wal_options: String,
823        #[snafu(implicit)]
824        location: Location,
825        #[snafu(source)]
826        error: serde_json::Error,
827    },
828
829    #[snafu(display("No leader found for table_id: {}", table_id))]
830    NoLeader {
831        table_id: TableId,
832        #[snafu(implicit)]
833        location: Location,
834    },
835
836    #[snafu(display(
837        "Procedure poison key already exists with a different value, key: {}, value: {}",
838        key,
839        value
840    ))]
841    ProcedurePoisonConflict {
842        key: String,
843        value: String,
844        #[snafu(implicit)]
845        location: Location,
846    },
847
848    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
849    PutPoison {
850        #[snafu(implicit)]
851        location: Location,
852        #[snafu(source)]
853        source: common_procedure::error::Error,
854    },
855
856    #[snafu(display("Failed to parse timezone"))]
857    InvalidTimeZone {
858        #[snafu(implicit)]
859        location: Location,
860        #[snafu(source)]
861        error: common_time::error::Error,
862    },
863    #[snafu(display("Invalid file path: {}", file_path))]
864    InvalidFilePath {
865        #[snafu(implicit)]
866        location: Location,
867        file_path: String,
868    },
869
870    #[snafu(display("Failed to serialize flexbuffers"))]
871    SerializeFlexbuffers {
872        #[snafu(implicit)]
873        location: Location,
874        #[snafu(source)]
875        error: flexbuffers::SerializationError,
876    },
877
878    #[snafu(display("Failed to deserialize flexbuffers"))]
879    DeserializeFlexbuffers {
880        #[snafu(implicit)]
881        location: Location,
882        #[snafu(source)]
883        error: flexbuffers::DeserializationError,
884    },
885
886    #[snafu(display("Failed to read flexbuffers"))]
887    ReadFlexbuffers {
888        #[snafu(implicit)]
889        location: Location,
890        #[snafu(source)]
891        error: flexbuffers::ReaderError,
892    },
893
894    #[snafu(display("Invalid file name: {}", reason))]
895    InvalidFileName {
896        #[snafu(implicit)]
897        location: Location,
898        reason: String,
899    },
900
901    #[snafu(display("Invalid file extension: {}", reason))]
902    InvalidFileExtension {
903        #[snafu(implicit)]
904        location: Location,
905        reason: String,
906    },
907
908    #[snafu(display("Failed to write object, file path: {}", file_path))]
909    WriteObject {
910        #[snafu(implicit)]
911        location: Location,
912        file_path: String,
913        #[snafu(source)]
914        error: object_store::Error,
915    },
916
917    #[snafu(display("Failed to read object, file path: {}", file_path))]
918    ReadObject {
919        #[snafu(implicit)]
920        location: Location,
921        file_path: String,
922        #[snafu(source)]
923        error: object_store::Error,
924    },
925
926    #[snafu(display("Missing column ids"))]
927    MissingColumnIds {
928        #[snafu(implicit)]
929        location: Location,
930    },
931
932    #[snafu(display(
933        "Missing column in column metadata: {}, table: {}, table_id: {}",
934        column_name,
935        table_name,
936        table_id,
937    ))]
938    MissingColumnInColumnMetadata {
939        column_name: String,
940        #[snafu(implicit)]
941        location: Location,
942        table_name: String,
943        table_id: TableId,
944    },
945
946    #[snafu(display(
947        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
948        column_name,
949        column_id,
950        table_name,
951        table_id,
952    ))]
953    MismatchColumnId {
954        column_name: String,
955        column_id: u32,
956        #[snafu(implicit)]
957        location: Location,
958        table_name: String,
959        table_id: TableId,
960    },
961
962    #[snafu(display("Failed to convert column def, column: {}", column))]
963    ConvertColumnDef {
964        column: String,
965        #[snafu(implicit)]
966        location: Location,
967        source: api::error::Error,
968    },
969
970    #[snafu(display("Failed to convert time ranges"))]
971    ConvertTimeRanges {
972        #[snafu(implicit)]
973        location: Location,
974        source: api::error::Error,
975    },
976
977    #[snafu(display(
978        "Column metadata inconsistencies found in table: {}, table_id: {}",
979        table_name,
980        table_id
981    ))]
982    ColumnMetadataConflicts {
983        table_name: String,
984        table_id: TableId,
985    },
986
987    #[snafu(display(
988        "Column not found in column metadata, column_name: {}, column_id: {}",
989        column_name,
990        column_id
991    ))]
992    ColumnNotFound { column_name: String, column_id: u32 },
993
994    #[snafu(display(
995        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
996        column_name,
997        expected_column_id,
998        actual_column_id
999    ))]
1000    ColumnIdMismatch {
1001        column_name: String,
1002        expected_column_id: u32,
1003        actual_column_id: u32,
1004    },
1005
1006    #[snafu(display(
1007        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1008        expected_column_name,
1009        expected_column_id,
1010        actual_column_name,
1011        actual_column_id,
1012    ))]
1013    TimestampMismatch {
1014        expected_column_name: String,
1015        expected_column_id: u32,
1016        actual_column_name: String,
1017        actual_column_id: u32,
1018    },
1019
1020    #[cfg(feature = "enterprise")]
1021    #[snafu(display("Too large duration"))]
1022    TooLargeDuration {
1023        #[snafu(source)]
1024        error: prost_types::DurationError,
1025        #[snafu(implicit)]
1026        location: Location,
1027    },
1028
1029    #[cfg(feature = "enterprise")]
1030    #[snafu(display("Negative duration"))]
1031    NegativeDuration {
1032        #[snafu(source)]
1033        error: prost_types::DurationError,
1034        #[snafu(implicit)]
1035        location: Location,
1036    },
1037
1038    #[cfg(feature = "enterprise")]
1039    #[snafu(display("Missing interval field"))]
1040    MissingInterval {
1041        #[snafu(implicit)]
1042        location: Location,
1043    },
1044}
1045
1046pub type Result<T> = std::result::Result<T, Error>;
1047
1048impl ErrorExt for Error {
1049    fn status_code(&self) -> StatusCode {
1050        use Error::*;
1051        match self {
1052            IllegalServerState { .. }
1053            | EtcdTxnOpResponse { .. }
1054            | EtcdFailed { .. }
1055            | EtcdTxnFailed { .. }
1056            | ConnectEtcd { .. }
1057            | MoveValues { .. }
1058            | GetCache { .. }
1059            | SerializeToJson { .. }
1060            | DeserializeFromJson { .. } => StatusCode::Internal,
1061
1062            NoLeader { .. } => StatusCode::TableUnavailable,
1063            ValueNotExist { .. }
1064            | ProcedurePoisonConflict { .. }
1065            | ProcedureStateReceiverNotFound { .. }
1066            | MissingColumnIds { .. }
1067            | MissingColumnInColumnMetadata { .. }
1068            | MismatchColumnId { .. }
1069            | ColumnMetadataConflicts { .. }
1070            | ColumnNotFound { .. }
1071            | ColumnIdMismatch { .. }
1072            | TimestampMismatch { .. } => StatusCode::Unexpected,
1073
1074            Unsupported { .. } => StatusCode::Unsupported,
1075            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1076
1077            SerdeJson { .. }
1078            | ParseOption { .. }
1079            | RouteInfoCorrupted { .. }
1080            | InvalidProtoMsg { .. }
1081            | InvalidMetadata { .. }
1082            | Unexpected { .. }
1083            | TableInfoNotFound { .. }
1084            | NextSequence { .. }
1085            | UnexpectedSequenceValue { .. }
1086            | InvalidHeartbeatResponse { .. }
1087            | EncodeJson { .. }
1088            | DecodeJson { .. }
1089            | PayloadNotExist { .. }
1090            | ConvertRawKey { .. }
1091            | DecodeProto { .. }
1092            | BuildTableMeta { .. }
1093            | TableRouteNotFound { .. }
1094            | ConvertRawTableInfo { .. }
1095            | RegionOperatingRace { .. }
1096            | EncodeWalOptions { .. }
1097            | BuildKafkaClient { .. }
1098            | BuildKafkaCtrlClient { .. }
1099            | KafkaPartitionClient { .. }
1100            | ProduceRecord { .. }
1101            | CreateKafkaWalTopic { .. }
1102            | EmptyTopicPool { .. }
1103            | UnexpectedLogicalRouteTable { .. }
1104            | ProcedureOutput { .. }
1105            | FromUtf8 { .. }
1106            | MetadataCorruption { .. }
1107            | ParseWalOptions { .. }
1108            | KafkaGetOffset { .. }
1109            | ReadFlexbuffers { .. }
1110            | SerializeFlexbuffers { .. }
1111            | DeserializeFlexbuffers { .. }
1112            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1113
1114            GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1115
1116            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1117
1118            ProcedureNotFound { .. }
1119            | InvalidViewInfo { .. }
1120            | PrimaryKeyNotFound { .. }
1121            | EmptyKey { .. }
1122            | AlterLogicalTablesInvalidArguments { .. }
1123            | CreateLogicalTablesInvalidArguments { .. }
1124            | MismatchPrefix { .. }
1125            | TlsConfig { .. }
1126            | InvalidSetDatabaseOption { .. }
1127            | InvalidUnsetDatabaseOption { .. }
1128            | InvalidTopicNamePrefix { .. }
1129            | InvalidTimeZone { .. }
1130            | InvalidFileExtension { .. }
1131            | InvalidFileName { .. }
1132            | InvalidFlowRequestBody { .. }
1133            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1134
1135            #[cfg(feature = "enterprise")]
1136            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1137                StatusCode::InvalidArguments
1138            }
1139
1140            FlowNotFound { .. } => StatusCode::FlowNotFound,
1141            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1142            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1143
1144            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1145                StatusCode::TableNotFound
1146            }
1147            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1148
1149            SubmitProcedure { source, .. }
1150            | QueryProcedure { source, .. }
1151            | WaitProcedure { source, .. }
1152            | StartProcedureManager { source, .. }
1153            | StopProcedureManager { source, .. } => source.status_code(),
1154            RegisterProcedureLoader { source, .. } => source.status_code(),
1155            External { source, .. } => source.status_code(),
1156            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1157            OperateDatanode { source, .. } => source.status_code(),
1158            Table { source, .. } => source.status_code(),
1159            RetryLater { source, .. } => source.status_code(),
1160            AbortProcedure { source, .. } => source.status_code(),
1161            ConvertAlterTableRequest { source, .. } => source.status_code(),
1162            PutPoison { source, .. } => source.status_code(),
1163            ConvertColumnDef { source, .. } => source.status_code(),
1164            ProcedureStateReceiver { source, .. } => source.status_code(),
1165
1166            ParseProcedureId { .. }
1167            | InvalidNumTopics { .. }
1168            | SchemaNotFound { .. }
1169            | CatalogNotFound { .. }
1170            | InvalidNodeInfoKey { .. }
1171            | InvalidStatKey { .. }
1172            | ParseNum { .. }
1173            | InvalidRole { .. }
1174            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1175
1176            LoadTlsCertificate { .. } => StatusCode::Internal,
1177
1178            #[cfg(feature = "pg_kvbackend")]
1179            PostgresExecution { .. }
1180            | CreatePostgresPool { .. }
1181            | GetPostgresConnection { .. }
1182            | PostgresTransaction { .. }
1183            | PostgresTlsConfig { .. }
1184            | InvalidTlsConfig { .. } => StatusCode::Internal,
1185            #[cfg(feature = "mysql_kvbackend")]
1186            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1187                StatusCode::Internal
1188            }
1189            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1190            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1191            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1192        }
1193    }
1194
1195    fn as_any(&self) -> &dyn std::any::Any {
1196        self
1197    }
1198}
1199
1200impl Error {
1201    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1202    /// Check if the error is a serialization error.
1203    pub fn is_serialization_error(&self) -> bool {
1204        match self {
1205            #[cfg(feature = "pg_kvbackend")]
1206            Error::PostgresTransaction { error, .. } => {
1207                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1208            }
1209            #[cfg(feature = "pg_kvbackend")]
1210            Error::PostgresExecution { error, .. } => {
1211                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1212            }
1213            #[cfg(feature = "mysql_kvbackend")]
1214            Error::MySqlExecution {
1215                error: sqlx::Error::Database(database_error),
1216                ..
1217            } => {
1218                matches!(
1219                    database_error.message(),
1220                    "Deadlock found when trying to get lock; try restarting transaction"
1221                        | "can't serialize access for this transaction"
1222                )
1223            }
1224            _ => false,
1225        }
1226    }
1227
1228    /// Creates a new [Error::RetryLater] error from source `err`.
1229    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1230        Error::RetryLater {
1231            source: BoxedError::new(err),
1232            clean_poisons: false,
1233        }
1234    }
1235
1236    /// Determine whether it is a retry later type through [StatusCode]
1237    pub fn is_retry_later(&self) -> bool {
1238        matches!(self, Error::RetryLater { .. })
1239    }
1240
1241    /// Determine whether it needs to clean poisons.
1242    pub fn need_clean_poisons(&self) -> bool {
1243        matches!(
1244            self,
1245            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1246        ) || matches!(
1247            self,
1248            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1249        )
1250    }
1251
1252    /// Returns true if the response exceeds the size limit.
1253    pub fn is_exceeded_size_limit(&self) -> bool {
1254        match self {
1255            Error::EtcdFailed {
1256                error: etcd_client::Error::GRpcStatus(status),
1257                ..
1258            } => status.code() == tonic::Code::OutOfRange,
1259            Error::ResponseExceededSizeLimit { .. } => true,
1260            _ => false,
1261        }
1262    }
1263}