common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to submit procedure"))]
108    SubmitProcedure {
109        #[snafu(implicit)]
110        location: Location,
111        source: common_procedure::Error,
112    },
113
114    #[snafu(display("Failed to query procedure"))]
115    QueryProcedure {
116        #[snafu(implicit)]
117        location: Location,
118        source: common_procedure::Error,
119    },
120
121    #[snafu(display("Procedure not found: {pid}"))]
122    ProcedureNotFound {
123        #[snafu(implicit)]
124        location: Location,
125        pid: String,
126    },
127
128    #[snafu(display("Failed to parse procedure id: {key}"))]
129    ParseProcedureId {
130        #[snafu(implicit)]
131        location: Location,
132        key: String,
133        #[snafu(source)]
134        error: common_procedure::ParseIdError,
135    },
136
137    #[snafu(display("Unsupported operation {}", operation))]
138    Unsupported {
139        operation: String,
140        #[snafu(implicit)]
141        location: Location,
142    },
143
144    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145    ProcedureStateReceiver {
146        procedure_id: ProcedureId,
147        #[snafu(implicit)]
148        location: Location,
149        source: common_procedure::Error,
150    },
151
152    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153    ProcedureStateReceiverNotFound {
154        procedure_id: ProcedureId,
155        #[snafu(implicit)]
156        location: Location,
157    },
158
159    #[snafu(display("Failed to wait procedure done"))]
160    WaitProcedure {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Failed to start procedure manager"))]
167    StartProcedureManager {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_procedure::Error,
171    },
172
173    #[snafu(display("Failed to stop procedure manager"))]
174    StopProcedureManager {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display(
181        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182    ))]
183    ProcedureOutput {
184        procedure_id: String,
185        err_msg: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191    ConvertRawTableInfo {
192        #[snafu(implicit)]
193        location: Location,
194        source: datatypes::Error,
195    },
196
197    #[snafu(display("Primary key '{key}' not found when creating region request"))]
198    PrimaryKeyNotFound {
199        key: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Failed to build table meta for table: {}", table_name))]
205    BuildTableMeta {
206        table_name: String,
207        #[snafu(source)]
208        error: table::metadata::TableMetaBuilderError,
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Table occurs error"))]
214    Table {
215        #[snafu(implicit)]
216        location: Location,
217        source: table::error::Error,
218    },
219
220    #[snafu(display("Failed to find table route for table id {}", table_id))]
221    TableRouteNotFound {
222        table_id: TableId,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Failed to decode protobuf"))]
228    DecodeProto {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: prost::DecodeError,
233    },
234
235    #[snafu(display("Failed to encode object into json"))]
236    EncodeJson {
237        #[snafu(implicit)]
238        location: Location,
239        #[snafu(source)]
240        error: JsonError,
241    },
242
243    #[snafu(display("Failed to decode object from json"))]
244    DecodeJson {
245        #[snafu(implicit)]
246        location: Location,
247        #[snafu(source)]
248        error: JsonError,
249    },
250
251    #[snafu(display("Failed to serialize to json: {}", input))]
252    SerializeToJson {
253        input: String,
254        #[snafu(source)]
255        error: serde_json::error::Error,
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to deserialize from json: {}", input))]
261    DeserializeFromJson {
262        input: String,
263        #[snafu(source)]
264        error: serde_json::error::Error,
265        #[snafu(implicit)]
266        location: Location,
267    },
268
269    #[snafu(display("Payload not exist"))]
270    PayloadNotExist {
271        #[snafu(implicit)]
272        location: Location,
273    },
274
275    #[snafu(display("Failed to send message: {err_msg}"))]
276    SendMessage {
277        err_msg: String,
278        #[snafu(implicit)]
279        location: Location,
280    },
281
282    #[snafu(display("Failed to serde json"))]
283    SerdeJson {
284        #[snafu(source)]
285        error: serde_json::error::Error,
286        #[snafu(implicit)]
287        location: Location,
288    },
289
290    #[snafu(display("Failed to parse value {} into key {}", value, key))]
291    ParseOption {
292        key: String,
293        value: String,
294        #[snafu(implicit)]
295        location: Location,
296    },
297
298    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
299    RouteInfoCorrupted {
300        err_msg: String,
301        #[snafu(implicit)]
302        location: Location,
303    },
304
305    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
306    IllegalServerState {
307        code: i32,
308        err_msg: String,
309        #[snafu(implicit)]
310        location: Location,
311    },
312
313    #[snafu(display("Failed to convert alter table request"))]
314    ConvertAlterTableRequest {
315        source: common_grpc_expr::error::Error,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Invalid protobuf message: {err_msg}"))]
321    InvalidProtoMsg {
322        err_msg: String,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Unexpected: {err_msg}"))]
328    Unexpected {
329        err_msg: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Table already exists, table: {}", table_name))]
335    TableAlreadyExists {
336        table_name: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("View already exists, view: {}", view_name))]
342    ViewAlreadyExists {
343        view_name: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("Flow already exists: {}", flow_name))]
349    FlowAlreadyExists {
350        flow_name: String,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
356    SchemaAlreadyExists {
357        catalog: String,
358        schema: String,
359        #[snafu(implicit)]
360        location: Location,
361    },
362
363    #[snafu(display("Failed to convert raw key to str"))]
364    ConvertRawKey {
365        #[snafu(implicit)]
366        location: Location,
367        #[snafu(source)]
368        error: Utf8Error,
369    },
370
371    #[snafu(display("Table not found: '{}'", table_name))]
372    TableNotFound {
373        table_name: String,
374        #[snafu(implicit)]
375        location: Location,
376    },
377
378    #[snafu(display("Region not found: {}", region_id))]
379    RegionNotFound {
380        region_id: RegionId,
381        #[snafu(implicit)]
382        location: Location,
383    },
384
385    #[snafu(display("View not found: '{}'", view_name))]
386    ViewNotFound {
387        view_name: String,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Flow not found: '{}'", flow_name))]
393    FlowNotFound {
394        flow_name: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("Flow route not found: '{}'", flow_name))]
400    FlowRouteNotFound {
401        flow_name: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Schema nod found, schema: {}", table_schema))]
407    SchemaNotFound {
408        table_schema: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Catalog not found, catalog: {}", catalog))]
414    CatalogNotFound {
415        catalog: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Invalid metadata, err: {}", err_msg))]
421    InvalidMetadata {
422        err_msg: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Invalid view info, err: {}", err_msg))]
428    InvalidViewInfo {
429        err_msg: String,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Invalid flow request body: {:?}", body))]
435    InvalidFlowRequestBody {
436        body: Box<Option<api::v1::flow::flow_request::Body>>,
437        #[snafu(implicit)]
438        location: Location,
439    },
440
441    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
442    GetKvCache { err_msg: String },
443
444    #[snafu(display("Get null from cache, key: {}", key))]
445    CacheNotGet {
446        key: String,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("Etcd txn error: {err_msg}"))]
452    EtcdTxnOpResponse {
453        err_msg: String,
454        #[snafu(implicit)]
455        location: Location,
456    },
457
458    #[snafu(display("External error"))]
459    External {
460        #[snafu(implicit)]
461        location: Location,
462        source: BoxedError,
463    },
464
465    #[snafu(display("The response exceeded size limit"))]
466    ResponseExceededSizeLimit {
467        #[snafu(implicit)]
468        location: Location,
469        source: BoxedError,
470    },
471
472    #[snafu(display("Invalid heartbeat response"))]
473    InvalidHeartbeatResponse {
474        #[snafu(implicit)]
475        location: Location,
476    },
477
478    #[snafu(display("Failed to operate on datanode: {}", peer))]
479    OperateDatanode {
480        #[snafu(implicit)]
481        location: Location,
482        peer: Peer,
483        source: BoxedError,
484    },
485
486    #[snafu(display("Retry later"))]
487    RetryLater {
488        source: BoxedError,
489        clean_poisons: bool,
490    },
491
492    #[snafu(display("Abort procedure"))]
493    AbortProcedure {
494        #[snafu(implicit)]
495        location: Location,
496        source: BoxedError,
497        clean_poisons: bool,
498    },
499
500    #[snafu(display(
501        "Failed to encode a wal options to json string, wal_options: {:?}",
502        wal_options
503    ))]
504    EncodeWalOptions {
505        wal_options: WalOptions,
506        #[snafu(source)]
507        error: serde_json::Error,
508        #[snafu(implicit)]
509        location: Location,
510    },
511
512    #[snafu(display("Invalid number of topics {}", num_topics))]
513    InvalidNumTopics {
514        num_topics: usize,
515        #[snafu(implicit)]
516        location: Location,
517    },
518
519    #[snafu(display(
520        "Failed to build a Kafka client, broker endpoints: {:?}",
521        broker_endpoints
522    ))]
523    BuildKafkaClient {
524        broker_endpoints: Vec<String>,
525        #[snafu(implicit)]
526        location: Location,
527        #[snafu(source)]
528        error: rskafka::client::error::Error,
529    },
530
531    #[snafu(display("Failed to create TLS Config"))]
532    TlsConfig {
533        #[snafu(implicit)]
534        location: Location,
535        source: common_wal::error::Error,
536    },
537
538    #[snafu(display("Failed to build a Kafka controller client"))]
539    BuildKafkaCtrlClient {
540        #[snafu(implicit)]
541        location: Location,
542        #[snafu(source)]
543        error: rskafka::client::error::Error,
544    },
545
546    #[snafu(display(
547        "Failed to get a Kafka partition client, topic: {}, partition: {}",
548        topic,
549        partition
550    ))]
551    KafkaPartitionClient {
552        topic: String,
553        partition: i32,
554        #[snafu(implicit)]
555        location: Location,
556        #[snafu(source)]
557        error: rskafka::client::error::Error,
558    },
559
560    #[snafu(display(
561        "Failed to get offset from Kafka, topic: {}, partition: {}",
562        topic,
563        partition
564    ))]
565    KafkaGetOffset {
566        topic: String,
567        partition: i32,
568        #[snafu(implicit)]
569        location: Location,
570        #[snafu(source)]
571        error: rskafka::client::error::Error,
572    },
573
574    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
575    ProduceRecord {
576        topic: String,
577        #[snafu(implicit)]
578        location: Location,
579        #[snafu(source)]
580        error: rskafka::client::error::Error,
581    },
582
583    #[snafu(display("Failed to create a Kafka wal topic"))]
584    CreateKafkaWalTopic {
585        #[snafu(implicit)]
586        location: Location,
587        #[snafu(source)]
588        error: rskafka::client::error::Error,
589    },
590
591    #[snafu(display("The topic pool is empty"))]
592    EmptyTopicPool {
593        #[snafu(implicit)]
594        location: Location,
595    },
596
597    #[snafu(display("Unexpected table route type: {}", err_msg))]
598    UnexpectedLogicalRouteTable {
599        #[snafu(implicit)]
600        location: Location,
601        err_msg: String,
602    },
603
604    #[snafu(display("The tasks of {} cannot be empty", name))]
605    EmptyDdlTasks {
606        name: String,
607        #[snafu(implicit)]
608        location: Location,
609    },
610
611    #[snafu(display("Metadata corruption: {}", err_msg))]
612    MetadataCorruption {
613        err_msg: String,
614        #[snafu(implicit)]
615        location: Location,
616    },
617
618    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
619    AlterLogicalTablesInvalidArguments {
620        err_msg: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
626    CreateLogicalTablesInvalidArguments {
627        err_msg: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Invalid node info key: {}", key))]
633    InvalidNodeInfoKey {
634        key: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Invalid node stat key: {}", key))]
640    InvalidStatKey {
641        key: String,
642        #[snafu(implicit)]
643        location: Location,
644    },
645
646    #[snafu(display("Failed to parse number: {}", err_msg))]
647    ParseNum {
648        err_msg: String,
649        #[snafu(source)]
650        error: std::num::ParseIntError,
651        #[snafu(implicit)]
652        location: Location,
653    },
654
655    #[snafu(display("Invalid role: {}", role))]
656    InvalidRole {
657        role: i32,
658        #[snafu(implicit)]
659        location: Location,
660    },
661
662    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
663    InvalidSetDatabaseOption {
664        key: String,
665        value: String,
666        #[snafu(implicit)]
667        location: Location,
668    },
669
670    #[snafu(display("Invalid unset database option, key: {}", key))]
671    InvalidUnsetDatabaseOption {
672        key: String,
673        #[snafu(implicit)]
674        location: Location,
675    },
676
677    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
678    MismatchPrefix {
679        prefix: String,
680        key: String,
681        #[snafu(implicit)]
682        location: Location,
683    },
684
685    #[snafu(display("Failed to move values: {err_msg}"))]
686    MoveValues {
687        err_msg: String,
688        #[snafu(implicit)]
689        location: Location,
690    },
691
692    #[snafu(display("Failed to parse {} from utf8", name))]
693    FromUtf8 {
694        name: String,
695        #[snafu(source)]
696        error: std::string::FromUtf8Error,
697        #[snafu(implicit)]
698        location: Location,
699    },
700
701    #[snafu(display("Value not exists"))]
702    ValueNotExist {
703        #[snafu(implicit)]
704        location: Location,
705    },
706
707    #[snafu(display("Failed to get cache"))]
708    GetCache { source: Arc<Error> },
709
710    #[cfg(feature = "pg_kvbackend")]
711    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
712    PostgresExecution {
713        sql: String,
714        #[snafu(source)]
715        error: tokio_postgres::Error,
716        #[snafu(implicit)]
717        location: Location,
718    },
719
720    #[cfg(feature = "pg_kvbackend")]
721    #[snafu(display("Failed to create connection pool for Postgres"))]
722    CreatePostgresPool {
723        #[snafu(source)]
724        error: deadpool_postgres::CreatePoolError,
725        #[snafu(implicit)]
726        location: Location,
727    },
728
729    #[cfg(feature = "pg_kvbackend")]
730    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
731    GetPostgresConnection {
732        reason: String,
733        #[snafu(implicit)]
734        location: Location,
735    },
736
737    #[cfg(feature = "pg_kvbackend")]
738    #[snafu(display("Failed to {} Postgres transaction", operation))]
739    PostgresTransaction {
740        #[snafu(source)]
741        error: tokio_postgres::Error,
742        #[snafu(implicit)]
743        location: Location,
744        operation: String,
745    },
746
747    #[cfg(feature = "pg_kvbackend")]
748    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
749    PostgresTlsConfig {
750        reason: String,
751        #[snafu(implicit)]
752        location: Location,
753    },
754
755    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
756    LoadTlsCertificate {
757        path: String,
758        #[snafu(source)]
759        error: std::io::Error,
760        #[snafu(implicit)]
761        location: Location,
762    },
763
764    #[cfg(feature = "pg_kvbackend")]
765    #[snafu(display("Invalid TLS configuration: {}", reason))]
766    InvalidTlsConfig {
767        reason: String,
768        #[snafu(implicit)]
769        location: Location,
770    },
771
772    #[cfg(feature = "mysql_kvbackend")]
773    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
774    MySqlExecution {
775        sql: String,
776        #[snafu(source)]
777        error: sqlx::Error,
778        #[snafu(implicit)]
779        location: Location,
780    },
781
782    #[cfg(feature = "mysql_kvbackend")]
783    #[snafu(display("Failed to create connection pool for MySql"))]
784    CreateMySqlPool {
785        #[snafu(source)]
786        error: sqlx::Error,
787        #[snafu(implicit)]
788        location: Location,
789    },
790
791    #[cfg(feature = "mysql_kvbackend")]
792    #[snafu(display("Failed to {} MySql transaction", operation))]
793    MySqlTransaction {
794        #[snafu(source)]
795        error: sqlx::Error,
796        #[snafu(implicit)]
797        location: Location,
798        operation: String,
799    },
800
801    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
802    #[snafu(display("Rds transaction retry failed"))]
803    RdsTransactionRetryFailed {
804        #[snafu(implicit)]
805        location: Location,
806    },
807
808    #[snafu(display(
809        "Datanode table info not found, table id: {}, datanode id: {}",
810        table_id,
811        datanode_id
812    ))]
813    DatanodeTableInfoNotFound {
814        datanode_id: DatanodeId,
815        table_id: TableId,
816        #[snafu(implicit)]
817        location: Location,
818    },
819
820    #[snafu(display("Invalid topic name prefix: {}", prefix))]
821    InvalidTopicNamePrefix {
822        prefix: String,
823        #[snafu(implicit)]
824        location: Location,
825    },
826
827    #[snafu(display("Failed to parse wal options: {}", wal_options))]
828    ParseWalOptions {
829        wal_options: String,
830        #[snafu(implicit)]
831        location: Location,
832        #[snafu(source)]
833        error: serde_json::Error,
834    },
835
836    #[snafu(display("No leader found for table_id: {}", table_id))]
837    NoLeader {
838        table_id: TableId,
839        #[snafu(implicit)]
840        location: Location,
841    },
842
843    #[snafu(display(
844        "Procedure poison key already exists with a different value, key: {}, value: {}",
845        key,
846        value
847    ))]
848    ProcedurePoisonConflict {
849        key: String,
850        value: String,
851        #[snafu(implicit)]
852        location: Location,
853    },
854
855    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
856    PutPoison {
857        #[snafu(implicit)]
858        location: Location,
859        #[snafu(source)]
860        source: common_procedure::error::Error,
861    },
862
863    #[snafu(display("Failed to parse timezone"))]
864    InvalidTimeZone {
865        #[snafu(implicit)]
866        location: Location,
867        #[snafu(source)]
868        error: common_time::error::Error,
869    },
870    #[snafu(display("Invalid file path: {}", file_path))]
871    InvalidFilePath {
872        #[snafu(implicit)]
873        location: Location,
874        file_path: String,
875    },
876
877    #[snafu(display("Failed to serialize flexbuffers"))]
878    SerializeFlexbuffers {
879        #[snafu(implicit)]
880        location: Location,
881        #[snafu(source)]
882        error: flexbuffers::SerializationError,
883    },
884
885    #[snafu(display("Failed to deserialize flexbuffers"))]
886    DeserializeFlexbuffers {
887        #[snafu(implicit)]
888        location: Location,
889        #[snafu(source)]
890        error: flexbuffers::DeserializationError,
891    },
892
893    #[snafu(display("Failed to read flexbuffers"))]
894    ReadFlexbuffers {
895        #[snafu(implicit)]
896        location: Location,
897        #[snafu(source)]
898        error: flexbuffers::ReaderError,
899    },
900
901    #[snafu(display("Invalid file name: {}", reason))]
902    InvalidFileName {
903        #[snafu(implicit)]
904        location: Location,
905        reason: String,
906    },
907
908    #[snafu(display("Invalid file extension: {}", reason))]
909    InvalidFileExtension {
910        #[snafu(implicit)]
911        location: Location,
912        reason: String,
913    },
914
915    #[snafu(display("Failed to write object, file path: {}", file_path))]
916    WriteObject {
917        #[snafu(implicit)]
918        location: Location,
919        file_path: String,
920        #[snafu(source)]
921        error: object_store::Error,
922    },
923
924    #[snafu(display("Failed to read object, file path: {}", file_path))]
925    ReadObject {
926        #[snafu(implicit)]
927        location: Location,
928        file_path: String,
929        #[snafu(source)]
930        error: object_store::Error,
931    },
932
933    #[snafu(display("Missing column ids"))]
934    MissingColumnIds {
935        #[snafu(implicit)]
936        location: Location,
937    },
938
939    #[snafu(display(
940        "Missing column in column metadata: {}, table: {}, table_id: {}",
941        column_name,
942        table_name,
943        table_id,
944    ))]
945    MissingColumnInColumnMetadata {
946        column_name: String,
947        #[snafu(implicit)]
948        location: Location,
949        table_name: String,
950        table_id: TableId,
951    },
952
953    #[snafu(display(
954        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
955        column_name,
956        column_id,
957        table_name,
958        table_id,
959    ))]
960    MismatchColumnId {
961        column_name: String,
962        column_id: u32,
963        #[snafu(implicit)]
964        location: Location,
965        table_name: String,
966        table_id: TableId,
967    },
968
969    #[snafu(display("Failed to convert column def, column: {}", column))]
970    ConvertColumnDef {
971        column: String,
972        #[snafu(implicit)]
973        location: Location,
974        source: api::error::Error,
975    },
976
977    #[snafu(display("Failed to convert time ranges"))]
978    ConvertTimeRanges {
979        #[snafu(implicit)]
980        location: Location,
981        source: api::error::Error,
982    },
983
984    #[snafu(display(
985        "Column metadata inconsistencies found in table: {}, table_id: {}",
986        table_name,
987        table_id
988    ))]
989    ColumnMetadataConflicts {
990        table_name: String,
991        table_id: TableId,
992    },
993
994    #[snafu(display(
995        "Column not found in column metadata, column_name: {}, column_id: {}",
996        column_name,
997        column_id
998    ))]
999    ColumnNotFound { column_name: String, column_id: u32 },
1000
1001    #[snafu(display(
1002        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1003        column_name,
1004        expected_column_id,
1005        actual_column_id
1006    ))]
1007    ColumnIdMismatch {
1008        column_name: String,
1009        expected_column_id: u32,
1010        actual_column_id: u32,
1011    },
1012
1013    #[snafu(display(
1014        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1015        expected_column_name,
1016        expected_column_id,
1017        actual_column_name,
1018        actual_column_id,
1019    ))]
1020    TimestampMismatch {
1021        expected_column_name: String,
1022        expected_column_id: u32,
1023        actual_column_name: String,
1024        actual_column_id: u32,
1025    },
1026
1027    #[cfg(feature = "enterprise")]
1028    #[snafu(display("Too large duration"))]
1029    TooLargeDuration {
1030        #[snafu(source)]
1031        error: prost_types::DurationError,
1032        #[snafu(implicit)]
1033        location: Location,
1034    },
1035
1036    #[cfg(feature = "enterprise")]
1037    #[snafu(display("Negative duration"))]
1038    NegativeDuration {
1039        #[snafu(source)]
1040        error: prost_types::DurationError,
1041        #[snafu(implicit)]
1042        location: Location,
1043    },
1044
1045    #[cfg(feature = "enterprise")]
1046    #[snafu(display("Missing interval field"))]
1047    MissingInterval {
1048        #[snafu(implicit)]
1049        location: Location,
1050    },
1051}
1052
1053pub type Result<T> = std::result::Result<T, Error>;
1054
1055impl ErrorExt for Error {
1056    fn status_code(&self) -> StatusCode {
1057        use Error::*;
1058        match self {
1059            IllegalServerState { .. }
1060            | EtcdTxnOpResponse { .. }
1061            | EtcdFailed { .. }
1062            | EtcdTxnFailed { .. }
1063            | ConnectEtcd { .. }
1064            | MoveValues { .. }
1065            | GetCache { .. }
1066            | SerializeToJson { .. }
1067            | DeserializeFromJson { .. } => StatusCode::Internal,
1068
1069            NoLeader { .. } => StatusCode::TableUnavailable,
1070            ValueNotExist { .. }
1071            | ProcedurePoisonConflict { .. }
1072            | ProcedureStateReceiverNotFound { .. }
1073            | MissingColumnIds { .. }
1074            | MissingColumnInColumnMetadata { .. }
1075            | MismatchColumnId { .. }
1076            | ColumnMetadataConflicts { .. }
1077            | ColumnNotFound { .. }
1078            | ColumnIdMismatch { .. }
1079            | TimestampMismatch { .. } => StatusCode::Unexpected,
1080
1081            Unsupported { .. } => StatusCode::Unsupported,
1082            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1083
1084            SerdeJson { .. }
1085            | ParseOption { .. }
1086            | RouteInfoCorrupted { .. }
1087            | InvalidProtoMsg { .. }
1088            | InvalidMetadata { .. }
1089            | Unexpected { .. }
1090            | TableInfoNotFound { .. }
1091            | NextSequence { .. }
1092            | UnexpectedSequenceValue { .. }
1093            | InvalidHeartbeatResponse { .. }
1094            | EncodeJson { .. }
1095            | DecodeJson { .. }
1096            | PayloadNotExist { .. }
1097            | ConvertRawKey { .. }
1098            | DecodeProto { .. }
1099            | BuildTableMeta { .. }
1100            | TableRouteNotFound { .. }
1101            | ConvertRawTableInfo { .. }
1102            | RegionOperatingRace { .. }
1103            | EncodeWalOptions { .. }
1104            | BuildKafkaClient { .. }
1105            | BuildKafkaCtrlClient { .. }
1106            | KafkaPartitionClient { .. }
1107            | ProduceRecord { .. }
1108            | CreateKafkaWalTopic { .. }
1109            | EmptyTopicPool { .. }
1110            | UnexpectedLogicalRouteTable { .. }
1111            | ProcedureOutput { .. }
1112            | FromUtf8 { .. }
1113            | MetadataCorruption { .. }
1114            | ParseWalOptions { .. }
1115            | KafkaGetOffset { .. }
1116            | ReadFlexbuffers { .. }
1117            | SerializeFlexbuffers { .. }
1118            | DeserializeFlexbuffers { .. }
1119            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1120
1121            SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1122
1123            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1124
1125            ProcedureNotFound { .. }
1126            | InvalidViewInfo { .. }
1127            | PrimaryKeyNotFound { .. }
1128            | EmptyKey { .. }
1129            | AlterLogicalTablesInvalidArguments { .. }
1130            | CreateLogicalTablesInvalidArguments { .. }
1131            | MismatchPrefix { .. }
1132            | TlsConfig { .. }
1133            | InvalidSetDatabaseOption { .. }
1134            | InvalidUnsetDatabaseOption { .. }
1135            | InvalidTopicNamePrefix { .. }
1136            | InvalidTimeZone { .. }
1137            | InvalidFileExtension { .. }
1138            | InvalidFileName { .. }
1139            | InvalidFlowRequestBody { .. }
1140            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1141
1142            #[cfg(feature = "enterprise")]
1143            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1144                StatusCode::InvalidArguments
1145            }
1146
1147            FlowNotFound { .. } => StatusCode::FlowNotFound,
1148            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1149            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1150
1151            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1152                StatusCode::TableNotFound
1153            }
1154            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1155
1156            SubmitProcedure { source, .. }
1157            | QueryProcedure { source, .. }
1158            | WaitProcedure { source, .. }
1159            | StartProcedureManager { source, .. }
1160            | StopProcedureManager { source, .. } => source.status_code(),
1161            RegisterProcedureLoader { source, .. } => source.status_code(),
1162            External { source, .. } => source.status_code(),
1163            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1164            OperateDatanode { source, .. } => source.status_code(),
1165            Table { source, .. } => source.status_code(),
1166            RetryLater { source, .. } => source.status_code(),
1167            AbortProcedure { source, .. } => source.status_code(),
1168            ConvertAlterTableRequest { source, .. } => source.status_code(),
1169            PutPoison { source, .. } => source.status_code(),
1170            ConvertColumnDef { source, .. } => source.status_code(),
1171            ProcedureStateReceiver { source, .. } => source.status_code(),
1172
1173            ParseProcedureId { .. }
1174            | InvalidNumTopics { .. }
1175            | SchemaNotFound { .. }
1176            | CatalogNotFound { .. }
1177            | InvalidNodeInfoKey { .. }
1178            | InvalidStatKey { .. }
1179            | ParseNum { .. }
1180            | InvalidRole { .. }
1181            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1182
1183            LoadTlsCertificate { .. } => StatusCode::Internal,
1184
1185            #[cfg(feature = "pg_kvbackend")]
1186            PostgresExecution { .. }
1187            | CreatePostgresPool { .. }
1188            | GetPostgresConnection { .. }
1189            | PostgresTransaction { .. }
1190            | PostgresTlsConfig { .. }
1191            | InvalidTlsConfig { .. } => StatusCode::Internal,
1192            #[cfg(feature = "mysql_kvbackend")]
1193            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1194                StatusCode::Internal
1195            }
1196            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1197            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1198            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1199        }
1200    }
1201
1202    fn as_any(&self) -> &dyn std::any::Any {
1203        self
1204    }
1205}
1206
1207impl Error {
1208    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1209    /// Check if the error is a serialization error.
1210    pub fn is_serialization_error(&self) -> bool {
1211        match self {
1212            #[cfg(feature = "pg_kvbackend")]
1213            Error::PostgresTransaction { error, .. } => {
1214                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1215            }
1216            #[cfg(feature = "pg_kvbackend")]
1217            Error::PostgresExecution { error, .. } => {
1218                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1219            }
1220            #[cfg(feature = "mysql_kvbackend")]
1221            Error::MySqlExecution {
1222                error: sqlx::Error::Database(database_error),
1223                ..
1224            } => {
1225                matches!(
1226                    database_error.message(),
1227                    "Deadlock found when trying to get lock; try restarting transaction"
1228                        | "can't serialize access for this transaction"
1229                )
1230            }
1231            _ => false,
1232        }
1233    }
1234
1235    /// Creates a new [Error::RetryLater] error from source `err`.
1236    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1237        Error::RetryLater {
1238            source: BoxedError::new(err),
1239            clean_poisons: false,
1240        }
1241    }
1242
1243    /// Determine whether it is a retry later type through [StatusCode]
1244    pub fn is_retry_later(&self) -> bool {
1245        matches!(self, Error::RetryLater { .. })
1246    }
1247
1248    /// Determine whether it needs to clean poisons.
1249    pub fn need_clean_poisons(&self) -> bool {
1250        matches!(
1251            self,
1252            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1253        ) || matches!(
1254            self,
1255            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1256        )
1257    }
1258
1259    /// Returns true if the response exceeds the size limit.
1260    pub fn is_exceeded_size_limit(&self) -> bool {
1261        match self {
1262            Error::EtcdFailed {
1263                error: etcd_client::Error::GRpcStatus(status),
1264                ..
1265            } => status.code() == tonic::Code::OutOfRange,
1266            Error::ResponseExceededSizeLimit { .. } => true,
1267            _ => false,
1268        }
1269    }
1270}