common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::peer::Peer;
29use crate::DatanodeId;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to submit procedure"))]
108    SubmitProcedure {
109        #[snafu(implicit)]
110        location: Location,
111        source: common_procedure::Error,
112    },
113
114    #[snafu(display("Failed to query procedure"))]
115    QueryProcedure {
116        #[snafu(implicit)]
117        location: Location,
118        source: common_procedure::Error,
119    },
120
121    #[snafu(display("Procedure not found: {pid}"))]
122    ProcedureNotFound {
123        #[snafu(implicit)]
124        location: Location,
125        pid: String,
126    },
127
128    #[snafu(display("Failed to parse procedure id: {key}"))]
129    ParseProcedureId {
130        #[snafu(implicit)]
131        location: Location,
132        key: String,
133        #[snafu(source)]
134        error: common_procedure::ParseIdError,
135    },
136
137    #[snafu(display("Unsupported operation {}", operation))]
138    Unsupported {
139        operation: String,
140        #[snafu(implicit)]
141        location: Location,
142    },
143
144    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145    ProcedureStateReceiver {
146        procedure_id: ProcedureId,
147        #[snafu(implicit)]
148        location: Location,
149        source: common_procedure::Error,
150    },
151
152    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153    ProcedureStateReceiverNotFound {
154        procedure_id: ProcedureId,
155        #[snafu(implicit)]
156        location: Location,
157    },
158
159    #[snafu(display("Failed to wait procedure done"))]
160    WaitProcedure {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Failed to start procedure manager"))]
167    StartProcedureManager {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_procedure::Error,
171    },
172
173    #[snafu(display("Failed to stop procedure manager"))]
174    StopProcedureManager {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display(
181        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182    ))]
183    ProcedureOutput {
184        procedure_id: String,
185        err_msg: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191    ConvertRawTableInfo {
192        #[snafu(implicit)]
193        location: Location,
194        source: datatypes::Error,
195    },
196
197    #[snafu(display("Primary key '{key}' not found when creating region request"))]
198    PrimaryKeyNotFound {
199        key: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Failed to build table meta for table: {}", table_name))]
205    BuildTableMeta {
206        table_name: String,
207        #[snafu(source)]
208        error: table::metadata::TableMetaBuilderError,
209        #[snafu(implicit)]
210        location: Location,
211    },
212
213    #[snafu(display("Table occurs error"))]
214    Table {
215        #[snafu(implicit)]
216        location: Location,
217        source: table::error::Error,
218    },
219
220    #[snafu(display("Failed to find table route for table id {}", table_id))]
221    TableRouteNotFound {
222        table_id: TableId,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Failed to decode protobuf"))]
228    DecodeProto {
229        #[snafu(implicit)]
230        location: Location,
231        #[snafu(source)]
232        error: prost::DecodeError,
233    },
234
235    #[snafu(display("Failed to encode object into json"))]
236    EncodeJson {
237        #[snafu(implicit)]
238        location: Location,
239        #[snafu(source)]
240        error: JsonError,
241    },
242
243    #[snafu(display("Failed to decode object from json"))]
244    DecodeJson {
245        #[snafu(implicit)]
246        location: Location,
247        #[snafu(source)]
248        error: JsonError,
249    },
250
251    #[snafu(display("Failed to serialize to json: {}", input))]
252    SerializeToJson {
253        input: String,
254        #[snafu(source)]
255        error: serde_json::error::Error,
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to deserialize from json: {}", input))]
261    DeserializeFromJson {
262        input: String,
263        #[snafu(source)]
264        error: serde_json::error::Error,
265        #[snafu(implicit)]
266        location: Location,
267    },
268
269    #[snafu(display("Payload not exist"))]
270    PayloadNotExist {
271        #[snafu(implicit)]
272        location: Location,
273    },
274
275    #[snafu(display("Failed to send message: {err_msg}"))]
276    SendMessage {
277        err_msg: String,
278        #[snafu(implicit)]
279        location: Location,
280    },
281
282    #[snafu(display("Failed to serde json"))]
283    SerdeJson {
284        #[snafu(source)]
285        error: serde_json::error::Error,
286        #[snafu(implicit)]
287        location: Location,
288    },
289
290    #[snafu(display("Failed to parse value {} into key {}", value, key))]
291    ParseOption {
292        key: String,
293        value: String,
294        #[snafu(implicit)]
295        location: Location,
296    },
297
298    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
299    RouteInfoCorrupted {
300        err_msg: String,
301        #[snafu(implicit)]
302        location: Location,
303    },
304
305    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
306    IllegalServerState {
307        code: i32,
308        err_msg: String,
309        #[snafu(implicit)]
310        location: Location,
311    },
312
313    #[snafu(display("Failed to convert alter table request"))]
314    ConvertAlterTableRequest {
315        source: common_grpc_expr::error::Error,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Invalid protobuf message: {err_msg}"))]
321    InvalidProtoMsg {
322        err_msg: String,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Unexpected: {err_msg}"))]
328    Unexpected {
329        err_msg: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Table already exists, table: {}", table_name))]
335    TableAlreadyExists {
336        table_name: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("View already exists, view: {}", view_name))]
342    ViewAlreadyExists {
343        view_name: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("Flow already exists: {}", flow_name))]
349    FlowAlreadyExists {
350        flow_name: String,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
356    SchemaAlreadyExists {
357        catalog: String,
358        schema: String,
359        #[snafu(implicit)]
360        location: Location,
361    },
362
363    #[snafu(display("Failed to convert raw key to str"))]
364    ConvertRawKey {
365        #[snafu(implicit)]
366        location: Location,
367        #[snafu(source)]
368        error: Utf8Error,
369    },
370
371    #[snafu(display("Table not found: '{}'", table_name))]
372    TableNotFound {
373        table_name: String,
374        #[snafu(implicit)]
375        location: Location,
376    },
377
378    #[snafu(display("Region not found: {}", region_id))]
379    RegionNotFound {
380        region_id: RegionId,
381        #[snafu(implicit)]
382        location: Location,
383    },
384
385    #[snafu(display("View not found: '{}'", view_name))]
386    ViewNotFound {
387        view_name: String,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Flow not found: '{}'", flow_name))]
393    FlowNotFound {
394        flow_name: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("Flow route not found: '{}'", flow_name))]
400    FlowRouteNotFound {
401        flow_name: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Schema nod found, schema: {}", table_schema))]
407    SchemaNotFound {
408        table_schema: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Catalog not found, catalog: {}", catalog))]
414    CatalogNotFound {
415        catalog: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Invalid metadata, err: {}", err_msg))]
421    InvalidMetadata {
422        err_msg: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Invalid view info, err: {}", err_msg))]
428    InvalidViewInfo {
429        err_msg: String,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Invalid flow request body: {:?}", body))]
435    InvalidFlowRequestBody {
436        body: Box<Option<api::v1::flow::flow_request::Body>>,
437        #[snafu(implicit)]
438        location: Location,
439    },
440
441    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
442    GetKvCache { err_msg: String },
443
444    #[snafu(display("Get null from cache, key: {}", key))]
445    CacheNotGet {
446        key: String,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("Etcd txn error: {err_msg}"))]
452    EtcdTxnOpResponse {
453        err_msg: String,
454        #[snafu(implicit)]
455        location: Location,
456    },
457
458    #[snafu(display("External error"))]
459    External {
460        #[snafu(implicit)]
461        location: Location,
462        source: BoxedError,
463    },
464
465    #[snafu(display("The response exceeded size limit"))]
466    ResponseExceededSizeLimit {
467        #[snafu(implicit)]
468        location: Location,
469        source: BoxedError,
470    },
471
472    #[snafu(display("Invalid heartbeat response"))]
473    InvalidHeartbeatResponse {
474        #[snafu(implicit)]
475        location: Location,
476    },
477
478    #[snafu(display("Failed to operate on datanode: {}", peer))]
479    OperateDatanode {
480        #[snafu(implicit)]
481        location: Location,
482        peer: Peer,
483        source: BoxedError,
484    },
485
486    #[snafu(display("Retry later"))]
487    RetryLater {
488        source: BoxedError,
489        clean_poisons: bool,
490    },
491
492    #[snafu(display("Abort procedure"))]
493    AbortProcedure {
494        #[snafu(implicit)]
495        location: Location,
496        source: BoxedError,
497        clean_poisons: bool,
498    },
499
500    #[snafu(display(
501        "Failed to encode a wal options to json string, wal_options: {:?}",
502        wal_options
503    ))]
504    EncodeWalOptions {
505        wal_options: WalOptions,
506        #[snafu(source)]
507        error: serde_json::Error,
508        #[snafu(implicit)]
509        location: Location,
510    },
511
512    #[snafu(display("Invalid number of topics {}", num_topics))]
513    InvalidNumTopics {
514        num_topics: usize,
515        #[snafu(implicit)]
516        location: Location,
517    },
518
519    #[snafu(display(
520        "Failed to build a Kafka client, broker endpoints: {:?}",
521        broker_endpoints
522    ))]
523    BuildKafkaClient {
524        broker_endpoints: Vec<String>,
525        #[snafu(implicit)]
526        location: Location,
527        #[snafu(source)]
528        error: rskafka::client::error::Error,
529    },
530
531    #[snafu(display("Failed to create TLS Config"))]
532    TlsConfig {
533        #[snafu(implicit)]
534        location: Location,
535        source: common_wal::error::Error,
536    },
537
538    #[snafu(display("Failed to build a Kafka controller client"))]
539    BuildKafkaCtrlClient {
540        #[snafu(implicit)]
541        location: Location,
542        #[snafu(source)]
543        error: rskafka::client::error::Error,
544    },
545
546    #[snafu(display(
547        "Failed to get a Kafka partition client, topic: {}, partition: {}",
548        topic,
549        partition
550    ))]
551    KafkaPartitionClient {
552        topic: String,
553        partition: i32,
554        #[snafu(implicit)]
555        location: Location,
556        #[snafu(source)]
557        error: rskafka::client::error::Error,
558    },
559
560    #[snafu(display(
561        "Failed to get offset from Kafka, topic: {}, partition: {}",
562        topic,
563        partition
564    ))]
565    KafkaGetOffset {
566        topic: String,
567        partition: i32,
568        #[snafu(implicit)]
569        location: Location,
570        #[snafu(source)]
571        error: rskafka::client::error::Error,
572    },
573
574    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
575    ProduceRecord {
576        topic: String,
577        #[snafu(implicit)]
578        location: Location,
579        #[snafu(source)]
580        error: rskafka::client::error::Error,
581    },
582
583    #[snafu(display("Failed to create a Kafka wal topic"))]
584    CreateKafkaWalTopic {
585        #[snafu(implicit)]
586        location: Location,
587        #[snafu(source)]
588        error: rskafka::client::error::Error,
589    },
590
591    #[snafu(display("The topic pool is empty"))]
592    EmptyTopicPool {
593        #[snafu(implicit)]
594        location: Location,
595    },
596
597    #[snafu(display("Unexpected table route type: {}", err_msg))]
598    UnexpectedLogicalRouteTable {
599        #[snafu(implicit)]
600        location: Location,
601        err_msg: String,
602    },
603
604    #[snafu(display("The tasks of {} cannot be empty", name))]
605    EmptyDdlTasks {
606        name: String,
607        #[snafu(implicit)]
608        location: Location,
609    },
610
611    #[snafu(display("Metadata corruption: {}", err_msg))]
612    MetadataCorruption {
613        err_msg: String,
614        #[snafu(implicit)]
615        location: Location,
616    },
617
618    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
619    AlterLogicalTablesInvalidArguments {
620        err_msg: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
626    CreateLogicalTablesInvalidArguments {
627        err_msg: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Invalid node info key: {}", key))]
633    InvalidNodeInfoKey {
634        key: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Invalid node stat key: {}", key))]
640    InvalidStatKey {
641        key: String,
642        #[snafu(implicit)]
643        location: Location,
644    },
645
646    #[snafu(display("Failed to parse number: {}", err_msg))]
647    ParseNum {
648        err_msg: String,
649        #[snafu(source)]
650        error: std::num::ParseIntError,
651        #[snafu(implicit)]
652        location: Location,
653    },
654
655    #[snafu(display("Invalid role: {}", role))]
656    InvalidRole {
657        role: i32,
658        #[snafu(implicit)]
659        location: Location,
660    },
661
662    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
663    InvalidSetDatabaseOption {
664        key: String,
665        value: String,
666        #[snafu(implicit)]
667        location: Location,
668    },
669
670    #[snafu(display("Invalid unset database option, key: {}", key))]
671    InvalidUnsetDatabaseOption {
672        key: String,
673        #[snafu(implicit)]
674        location: Location,
675    },
676
677    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
678    MismatchPrefix {
679        prefix: String,
680        key: String,
681        #[snafu(implicit)]
682        location: Location,
683    },
684
685    #[snafu(display("Failed to move values: {err_msg}"))]
686    MoveValues {
687        err_msg: String,
688        #[snafu(implicit)]
689        location: Location,
690    },
691
692    #[snafu(display("Failed to parse {} from utf8", name))]
693    FromUtf8 {
694        name: String,
695        #[snafu(source)]
696        error: std::string::FromUtf8Error,
697        #[snafu(implicit)]
698        location: Location,
699    },
700
701    #[snafu(display("Value not exists"))]
702    ValueNotExist {
703        #[snafu(implicit)]
704        location: Location,
705    },
706
707    #[snafu(display("Failed to get cache"))]
708    GetCache { source: Arc<Error> },
709
710    #[cfg(feature = "pg_kvbackend")]
711    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
712    PostgresExecution {
713        sql: String,
714        #[snafu(source)]
715        error: tokio_postgres::Error,
716        #[snafu(implicit)]
717        location: Location,
718    },
719
720    #[cfg(feature = "pg_kvbackend")]
721    #[snafu(display("Failed to create connection pool for Postgres"))]
722    CreatePostgresPool {
723        #[snafu(source)]
724        error: deadpool_postgres::CreatePoolError,
725        #[snafu(implicit)]
726        location: Location,
727    },
728
729    #[cfg(feature = "pg_kvbackend")]
730    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
731    GetPostgresConnection {
732        reason: String,
733        #[snafu(implicit)]
734        location: Location,
735    },
736
737    #[cfg(feature = "pg_kvbackend")]
738    #[snafu(display("Failed to {} Postgres transaction", operation))]
739    PostgresTransaction {
740        #[snafu(source)]
741        error: tokio_postgres::Error,
742        #[snafu(implicit)]
743        location: Location,
744        operation: String,
745    },
746
747    #[cfg(feature = "pg_kvbackend")]
748    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
749    PostgresTlsConfig {
750        reason: String,
751        #[snafu(implicit)]
752        location: Location,
753    },
754
755    #[cfg(feature = "pg_kvbackend")]
756    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
757    LoadTlsCertificate {
758        path: String,
759        #[snafu(source)]
760        error: std::io::Error,
761        #[snafu(implicit)]
762        location: Location,
763    },
764
765    #[cfg(feature = "pg_kvbackend")]
766    #[snafu(display("Invalid TLS configuration: {}", reason))]
767    InvalidTlsConfig {
768        reason: String,
769        #[snafu(implicit)]
770        location: Location,
771    },
772
773    #[cfg(feature = "mysql_kvbackend")]
774    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
775    MySqlExecution {
776        sql: String,
777        #[snafu(source)]
778        error: sqlx::Error,
779        #[snafu(implicit)]
780        location: Location,
781    },
782
783    #[cfg(feature = "mysql_kvbackend")]
784    #[snafu(display("Failed to create connection pool for MySql"))]
785    CreateMySqlPool {
786        #[snafu(source)]
787        error: sqlx::Error,
788        #[snafu(implicit)]
789        location: Location,
790    },
791
792    #[cfg(feature = "mysql_kvbackend")]
793    #[snafu(display("Failed to {} MySql transaction", operation))]
794    MySqlTransaction {
795        #[snafu(source)]
796        error: sqlx::Error,
797        #[snafu(implicit)]
798        location: Location,
799        operation: String,
800    },
801
802    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
803    #[snafu(display("Rds transaction retry failed"))]
804    RdsTransactionRetryFailed {
805        #[snafu(implicit)]
806        location: Location,
807    },
808
809    #[snafu(display(
810        "Datanode table info not found, table id: {}, datanode id: {}",
811        table_id,
812        datanode_id
813    ))]
814    DatanodeTableInfoNotFound {
815        datanode_id: DatanodeId,
816        table_id: TableId,
817        #[snafu(implicit)]
818        location: Location,
819    },
820
821    #[snafu(display("Invalid topic name prefix: {}", prefix))]
822    InvalidTopicNamePrefix {
823        prefix: String,
824        #[snafu(implicit)]
825        location: Location,
826    },
827
828    #[snafu(display("Failed to parse wal options: {}", wal_options))]
829    ParseWalOptions {
830        wal_options: String,
831        #[snafu(implicit)]
832        location: Location,
833        #[snafu(source)]
834        error: serde_json::Error,
835    },
836
837    #[snafu(display("No leader found for table_id: {}", table_id))]
838    NoLeader {
839        table_id: TableId,
840        #[snafu(implicit)]
841        location: Location,
842    },
843
844    #[snafu(display(
845        "Procedure poison key already exists with a different value, key: {}, value: {}",
846        key,
847        value
848    ))]
849    ProcedurePoisonConflict {
850        key: String,
851        value: String,
852        #[snafu(implicit)]
853        location: Location,
854    },
855
856    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
857    PutPoison {
858        #[snafu(implicit)]
859        location: Location,
860        #[snafu(source)]
861        source: common_procedure::error::Error,
862    },
863
864    #[snafu(display("Failed to parse timezone"))]
865    InvalidTimeZone {
866        #[snafu(implicit)]
867        location: Location,
868        #[snafu(source)]
869        error: common_time::error::Error,
870    },
871    #[snafu(display("Invalid file path: {}", file_path))]
872    InvalidFilePath {
873        #[snafu(implicit)]
874        location: Location,
875        file_path: String,
876    },
877
878    #[snafu(display("Failed to serialize flexbuffers"))]
879    SerializeFlexbuffers {
880        #[snafu(implicit)]
881        location: Location,
882        #[snafu(source)]
883        error: flexbuffers::SerializationError,
884    },
885
886    #[snafu(display("Failed to deserialize flexbuffers"))]
887    DeserializeFlexbuffers {
888        #[snafu(implicit)]
889        location: Location,
890        #[snafu(source)]
891        error: flexbuffers::DeserializationError,
892    },
893
894    #[snafu(display("Failed to read flexbuffers"))]
895    ReadFlexbuffers {
896        #[snafu(implicit)]
897        location: Location,
898        #[snafu(source)]
899        error: flexbuffers::ReaderError,
900    },
901
902    #[snafu(display("Invalid file name: {}", reason))]
903    InvalidFileName {
904        #[snafu(implicit)]
905        location: Location,
906        reason: String,
907    },
908
909    #[snafu(display("Invalid file extension: {}", reason))]
910    InvalidFileExtension {
911        #[snafu(implicit)]
912        location: Location,
913        reason: String,
914    },
915
916    #[snafu(display("Failed to write object, file path: {}", file_path))]
917    WriteObject {
918        #[snafu(implicit)]
919        location: Location,
920        file_path: String,
921        #[snafu(source)]
922        error: object_store::Error,
923    },
924
925    #[snafu(display("Failed to read object, file path: {}", file_path))]
926    ReadObject {
927        #[snafu(implicit)]
928        location: Location,
929        file_path: String,
930        #[snafu(source)]
931        error: object_store::Error,
932    },
933
934    #[snafu(display("Missing column ids"))]
935    MissingColumnIds {
936        #[snafu(implicit)]
937        location: Location,
938    },
939
940    #[snafu(display(
941        "Missing column in column metadata: {}, table: {}, table_id: {}",
942        column_name,
943        table_name,
944        table_id,
945    ))]
946    MissingColumnInColumnMetadata {
947        column_name: String,
948        #[snafu(implicit)]
949        location: Location,
950        table_name: String,
951        table_id: TableId,
952    },
953
954    #[snafu(display(
955        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
956        column_name,
957        column_id,
958        table_name,
959        table_id,
960    ))]
961    MismatchColumnId {
962        column_name: String,
963        column_id: u32,
964        #[snafu(implicit)]
965        location: Location,
966        table_name: String,
967        table_id: TableId,
968    },
969
970    #[snafu(display("Failed to convert column def, column: {}", column))]
971    ConvertColumnDef {
972        column: String,
973        #[snafu(implicit)]
974        location: Location,
975        source: api::error::Error,
976    },
977
978    #[snafu(display("Failed to convert time ranges"))]
979    ConvertTimeRanges {
980        #[snafu(implicit)]
981        location: Location,
982        source: api::error::Error,
983    },
984
985    #[snafu(display(
986        "Column metadata inconsistencies found in table: {}, table_id: {}",
987        table_name,
988        table_id
989    ))]
990    ColumnMetadataConflicts {
991        table_name: String,
992        table_id: TableId,
993    },
994
995    #[snafu(display(
996        "Column not found in column metadata, column_name: {}, column_id: {}",
997        column_name,
998        column_id
999    ))]
1000    ColumnNotFound { column_name: String, column_id: u32 },
1001
1002    #[snafu(display(
1003        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1004        column_name,
1005        expected_column_id,
1006        actual_column_id
1007    ))]
1008    ColumnIdMismatch {
1009        column_name: String,
1010        expected_column_id: u32,
1011        actual_column_id: u32,
1012    },
1013
1014    #[snafu(display(
1015        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1016        expected_column_name,
1017        expected_column_id,
1018        actual_column_name,
1019        actual_column_id,
1020    ))]
1021    TimestampMismatch {
1022        expected_column_name: String,
1023        expected_column_id: u32,
1024        actual_column_name: String,
1025        actual_column_id: u32,
1026    },
1027
1028    #[cfg(feature = "enterprise")]
1029    #[snafu(display("Too large duration"))]
1030    TooLargeDuration {
1031        #[snafu(source)]
1032        error: prost_types::DurationError,
1033        #[snafu(implicit)]
1034        location: Location,
1035    },
1036
1037    #[cfg(feature = "enterprise")]
1038    #[snafu(display("Negative duration"))]
1039    NegativeDuration {
1040        #[snafu(source)]
1041        error: prost_types::DurationError,
1042        #[snafu(implicit)]
1043        location: Location,
1044    },
1045
1046    #[cfg(feature = "enterprise")]
1047    #[snafu(display("Missing interval field"))]
1048    MissingInterval {
1049        #[snafu(implicit)]
1050        location: Location,
1051    },
1052}
1053
1054pub type Result<T> = std::result::Result<T, Error>;
1055
1056impl ErrorExt for Error {
1057    fn status_code(&self) -> StatusCode {
1058        use Error::*;
1059        match self {
1060            IllegalServerState { .. }
1061            | EtcdTxnOpResponse { .. }
1062            | EtcdFailed { .. }
1063            | EtcdTxnFailed { .. }
1064            | ConnectEtcd { .. }
1065            | MoveValues { .. }
1066            | GetCache { .. }
1067            | SerializeToJson { .. }
1068            | DeserializeFromJson { .. } => StatusCode::Internal,
1069
1070            NoLeader { .. } => StatusCode::TableUnavailable,
1071            ValueNotExist { .. }
1072            | ProcedurePoisonConflict { .. }
1073            | ProcedureStateReceiverNotFound { .. }
1074            | MissingColumnIds { .. }
1075            | MissingColumnInColumnMetadata { .. }
1076            | MismatchColumnId { .. }
1077            | ColumnMetadataConflicts { .. }
1078            | ColumnNotFound { .. }
1079            | ColumnIdMismatch { .. }
1080            | TimestampMismatch { .. } => StatusCode::Unexpected,
1081
1082            Unsupported { .. } => StatusCode::Unsupported,
1083            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1084
1085            SerdeJson { .. }
1086            | ParseOption { .. }
1087            | RouteInfoCorrupted { .. }
1088            | InvalidProtoMsg { .. }
1089            | InvalidMetadata { .. }
1090            | Unexpected { .. }
1091            | TableInfoNotFound { .. }
1092            | NextSequence { .. }
1093            | UnexpectedSequenceValue { .. }
1094            | InvalidHeartbeatResponse { .. }
1095            | EncodeJson { .. }
1096            | DecodeJson { .. }
1097            | PayloadNotExist { .. }
1098            | ConvertRawKey { .. }
1099            | DecodeProto { .. }
1100            | BuildTableMeta { .. }
1101            | TableRouteNotFound { .. }
1102            | ConvertRawTableInfo { .. }
1103            | RegionOperatingRace { .. }
1104            | EncodeWalOptions { .. }
1105            | BuildKafkaClient { .. }
1106            | BuildKafkaCtrlClient { .. }
1107            | KafkaPartitionClient { .. }
1108            | ProduceRecord { .. }
1109            | CreateKafkaWalTopic { .. }
1110            | EmptyTopicPool { .. }
1111            | UnexpectedLogicalRouteTable { .. }
1112            | ProcedureOutput { .. }
1113            | FromUtf8 { .. }
1114            | MetadataCorruption { .. }
1115            | ParseWalOptions { .. }
1116            | KafkaGetOffset { .. }
1117            | ReadFlexbuffers { .. }
1118            | SerializeFlexbuffers { .. }
1119            | DeserializeFlexbuffers { .. }
1120            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1121
1122            SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1123
1124            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1125
1126            ProcedureNotFound { .. }
1127            | InvalidViewInfo { .. }
1128            | PrimaryKeyNotFound { .. }
1129            | EmptyKey { .. }
1130            | AlterLogicalTablesInvalidArguments { .. }
1131            | CreateLogicalTablesInvalidArguments { .. }
1132            | MismatchPrefix { .. }
1133            | TlsConfig { .. }
1134            | InvalidSetDatabaseOption { .. }
1135            | InvalidUnsetDatabaseOption { .. }
1136            | InvalidTopicNamePrefix { .. }
1137            | InvalidTimeZone { .. }
1138            | InvalidFileExtension { .. }
1139            | InvalidFileName { .. }
1140            | InvalidFlowRequestBody { .. }
1141            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1142
1143            #[cfg(feature = "enterprise")]
1144            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1145                StatusCode::InvalidArguments
1146            }
1147
1148            FlowNotFound { .. } => StatusCode::FlowNotFound,
1149            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1150            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1151
1152            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1153                StatusCode::TableNotFound
1154            }
1155            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1156
1157            SubmitProcedure { source, .. }
1158            | QueryProcedure { source, .. }
1159            | WaitProcedure { source, .. }
1160            | StartProcedureManager { source, .. }
1161            | StopProcedureManager { source, .. } => source.status_code(),
1162            RegisterProcedureLoader { source, .. } => source.status_code(),
1163            External { source, .. } => source.status_code(),
1164            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1165            OperateDatanode { source, .. } => source.status_code(),
1166            Table { source, .. } => source.status_code(),
1167            RetryLater { source, .. } => source.status_code(),
1168            AbortProcedure { source, .. } => source.status_code(),
1169            ConvertAlterTableRequest { source, .. } => source.status_code(),
1170            PutPoison { source, .. } => source.status_code(),
1171            ConvertColumnDef { source, .. } => source.status_code(),
1172            ProcedureStateReceiver { source, .. } => source.status_code(),
1173
1174            ParseProcedureId { .. }
1175            | InvalidNumTopics { .. }
1176            | SchemaNotFound { .. }
1177            | CatalogNotFound { .. }
1178            | InvalidNodeInfoKey { .. }
1179            | InvalidStatKey { .. }
1180            | ParseNum { .. }
1181            | InvalidRole { .. }
1182            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1183
1184            #[cfg(feature = "pg_kvbackend")]
1185            PostgresExecution { .. }
1186            | CreatePostgresPool { .. }
1187            | GetPostgresConnection { .. }
1188            | PostgresTransaction { .. }
1189            | PostgresTlsConfig { .. }
1190            | LoadTlsCertificate { .. }
1191            | InvalidTlsConfig { .. } => StatusCode::Internal,
1192            #[cfg(feature = "mysql_kvbackend")]
1193            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1194                StatusCode::Internal
1195            }
1196            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1197            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1198            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1199        }
1200    }
1201
1202    fn as_any(&self) -> &dyn std::any::Any {
1203        self
1204    }
1205}
1206
1207impl Error {
1208    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1209    /// Check if the error is a serialization error.
1210    pub fn is_serialization_error(&self) -> bool {
1211        match self {
1212            #[cfg(feature = "pg_kvbackend")]
1213            Error::PostgresTransaction { error, .. } => {
1214                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1215            }
1216            #[cfg(feature = "pg_kvbackend")]
1217            Error::PostgresExecution { error, .. } => {
1218                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1219            }
1220            #[cfg(feature = "mysql_kvbackend")]
1221            Error::MySqlExecution {
1222                error: sqlx::Error::Database(database_error),
1223                ..
1224            } => {
1225                matches!(
1226                    database_error.message(),
1227                    "Deadlock found when trying to get lock; try restarting transaction"
1228                        | "can't serialize access for this transaction"
1229                )
1230            }
1231            _ => false,
1232        }
1233    }
1234
1235    /// Creates a new [Error::RetryLater] error from source `err`.
1236    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1237        Error::RetryLater {
1238            source: BoxedError::new(err),
1239            clean_poisons: false,
1240        }
1241    }
1242
1243    /// Determine whether it is a retry later type through [StatusCode]
1244    pub fn is_retry_later(&self) -> bool {
1245        matches!(self, Error::RetryLater { .. })
1246    }
1247
1248    /// Determine whether it needs to clean poisons.
1249    pub fn need_clean_poisons(&self) -> bool {
1250        matches!(
1251            self,
1252            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1253        ) || matches!(
1254            self,
1255            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1256        )
1257    }
1258
1259    /// Returns true if the response exceeds the size limit.
1260    pub fn is_exceeded_size_limit(&self) -> bool {
1261        match self {
1262            Error::EtcdFailed {
1263                error: etcd_client::Error::GRpcStatus(status),
1264                ..
1265            } => status.code() == tonic::Code::OutOfRange,
1266            Error::ResponseExceededSizeLimit { .. } => true,
1267            _ => false,
1268        }
1269    }
1270}