common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to register repartition procedure loader"))]
108    RegisterRepartitionProcedureLoader {
109        #[snafu(implicit)]
110        location: Location,
111        source: BoxedError,
112    },
113
114    #[snafu(display("Failed to create repartition procedure"))]
115    CreateRepartitionProcedure {
116        source: BoxedError,
117        #[snafu(implicit)]
118        location: Location,
119    },
120
121    #[snafu(display("Failed to submit procedure"))]
122    SubmitProcedure {
123        #[snafu(implicit)]
124        location: Location,
125        source: common_procedure::Error,
126    },
127
128    #[snafu(display("Failed to query procedure"))]
129    QueryProcedure {
130        #[snafu(implicit)]
131        location: Location,
132        source: common_procedure::Error,
133    },
134
135    #[snafu(display("Procedure not found: {pid}"))]
136    ProcedureNotFound {
137        #[snafu(implicit)]
138        location: Location,
139        pid: String,
140    },
141
142    #[snafu(display("Failed to parse procedure id: {key}"))]
143    ParseProcedureId {
144        #[snafu(implicit)]
145        location: Location,
146        key: String,
147        #[snafu(source)]
148        error: common_procedure::ParseIdError,
149    },
150
151    #[snafu(display("Unsupported operation {}", operation))]
152    Unsupported {
153        operation: String,
154        #[snafu(implicit)]
155        location: Location,
156    },
157
158    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159    ProcedureStateReceiver {
160        procedure_id: ProcedureId,
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167    ProcedureStateReceiverNotFound {
168        procedure_id: ProcedureId,
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Failed to wait procedure done"))]
174    WaitProcedure {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display("Failed to start procedure manager"))]
181    StartProcedureManager {
182        #[snafu(implicit)]
183        location: Location,
184        source: common_procedure::Error,
185    },
186
187    #[snafu(display("Failed to stop procedure manager"))]
188    StopProcedureManager {
189        #[snafu(implicit)]
190        location: Location,
191        source: common_procedure::Error,
192    },
193
194    #[snafu(display(
195        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196    ))]
197    ProcedureOutput {
198        procedure_id: String,
199        err_msg: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Primary key '{key}' not found when creating region request"))]
205    PrimaryKeyNotFound {
206        key: String,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to build table meta for table: {}", table_name))]
212    BuildTableMeta {
213        table_name: String,
214        #[snafu(source)]
215        error: table::metadata::TableMetaBuilderError,
216        #[snafu(implicit)]
217        location: Location,
218    },
219
220    #[snafu(display("Table occurs error"))]
221    Table {
222        #[snafu(implicit)]
223        location: Location,
224        source: table::error::Error,
225    },
226
227    #[snafu(display("Failed to find table route for table id {}", table_id))]
228    TableRouteNotFound {
229        table_id: TableId,
230        #[snafu(implicit)]
231        location: Location,
232    },
233
234    #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
235    TableRepartNotFound {
236        table_id: TableId,
237        #[snafu(implicit)]
238        location: Location,
239    },
240
241    #[snafu(display("Failed to decode protobuf"))]
242    DecodeProto {
243        #[snafu(implicit)]
244        location: Location,
245        #[snafu(source)]
246        error: prost::DecodeError,
247    },
248
249    #[snafu(display("Failed to encode object into json"))]
250    EncodeJson {
251        #[snafu(implicit)]
252        location: Location,
253        #[snafu(source)]
254        error: JsonError,
255    },
256
257    #[snafu(display("Failed to decode object from json"))]
258    DecodeJson {
259        #[snafu(implicit)]
260        location: Location,
261        #[snafu(source)]
262        error: JsonError,
263    },
264
265    #[snafu(display("Failed to serialize to json: {}", input))]
266    SerializeToJson {
267        input: String,
268        #[snafu(source)]
269        error: serde_json::error::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to deserialize from json: {}", input))]
275    DeserializeFromJson {
276        input: String,
277        #[snafu(source)]
278        error: serde_json::error::Error,
279        #[snafu(implicit)]
280        location: Location,
281    },
282
283    #[snafu(display("Payload not exist"))]
284    PayloadNotExist {
285        #[snafu(implicit)]
286        location: Location,
287    },
288
289    #[snafu(display("Failed to serde json"))]
290    SerdeJson {
291        #[snafu(source)]
292        error: serde_json::error::Error,
293        #[snafu(implicit)]
294        location: Location,
295    },
296
297    #[snafu(display("Failed to parse value {} into key {}", value, key))]
298    ParseOption {
299        key: String,
300        value: String,
301        #[snafu(implicit)]
302        location: Location,
303    },
304
305    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
306    RouteInfoCorrupted {
307        err_msg: String,
308        #[snafu(implicit)]
309        location: Location,
310    },
311
312    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
313    IllegalServerState {
314        code: i32,
315        err_msg: String,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Failed to convert alter table request"))]
321    ConvertAlterTableRequest {
322        source: common_grpc_expr::error::Error,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Invalid protobuf message: {err_msg}"))]
328    InvalidProtoMsg {
329        err_msg: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Unexpected: {err_msg}"))]
335    Unexpected {
336        err_msg: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("Table already exists, table: {}", table_name))]
342    TableAlreadyExists {
343        table_name: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("View already exists, view: {}", view_name))]
349    ViewAlreadyExists {
350        view_name: String,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("Flow already exists: {}", flow_name))]
356    FlowAlreadyExists {
357        flow_name: String,
358        #[snafu(implicit)]
359        location: Location,
360    },
361
362    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
363    SchemaAlreadyExists {
364        catalog: String,
365        schema: String,
366        #[snafu(implicit)]
367        location: Location,
368    },
369
370    #[snafu(display("Failed to convert raw key to str"))]
371    ConvertRawKey {
372        #[snafu(implicit)]
373        location: Location,
374        #[snafu(source)]
375        error: Utf8Error,
376    },
377
378    #[snafu(display("Table not found: '{}'", table_name))]
379    TableNotFound {
380        table_name: String,
381        #[snafu(implicit)]
382        location: Location,
383    },
384
385    #[snafu(display("Region not found: {}", region_id))]
386    RegionNotFound {
387        region_id: RegionId,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("View not found: '{}'", view_name))]
393    ViewNotFound {
394        view_name: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("Flow not found: '{}'", flow_name))]
400    FlowNotFound {
401        flow_name: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Flow route not found: '{}'", flow_name))]
407    FlowRouteNotFound {
408        flow_name: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Schema nod found, schema: {}", table_schema))]
414    SchemaNotFound {
415        table_schema: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Catalog not found, catalog: {}", catalog))]
421    CatalogNotFound {
422        catalog: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Invalid metadata, err: {}", err_msg))]
428    InvalidMetadata {
429        err_msg: String,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Invalid view info, err: {}", err_msg))]
435    InvalidViewInfo {
436        err_msg: String,
437        #[snafu(implicit)]
438        location: Location,
439    },
440
441    #[snafu(display("Invalid flow request body: {:?}", body))]
442    InvalidFlowRequestBody {
443        body: Box<Option<api::v1::flow::flow_request::Body>>,
444        #[snafu(implicit)]
445        location: Location,
446    },
447
448    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
449    GetKvCache { err_msg: String },
450
451    #[snafu(display("Get null from cache, key: {}", key))]
452    CacheNotGet {
453        key: String,
454        #[snafu(implicit)]
455        location: Location,
456    },
457
458    #[snafu(display("Etcd txn error: {err_msg}"))]
459    EtcdTxnOpResponse {
460        err_msg: String,
461        #[snafu(implicit)]
462        location: Location,
463    },
464
465    #[snafu(display("External error"))]
466    External {
467        #[snafu(implicit)]
468        location: Location,
469        source: BoxedError,
470    },
471
472    #[snafu(display("The response exceeded size limit"))]
473    ResponseExceededSizeLimit {
474        #[snafu(implicit)]
475        location: Location,
476        source: BoxedError,
477    },
478
479    #[snafu(display("Invalid heartbeat response"))]
480    InvalidHeartbeatResponse {
481        #[snafu(implicit)]
482        location: Location,
483    },
484
485    #[snafu(display("Failed to operate on datanode: {}", peer))]
486    OperateDatanode {
487        #[snafu(implicit)]
488        location: Location,
489        peer: Peer,
490        source: BoxedError,
491    },
492
493    #[snafu(display("Retry later"))]
494    RetryLater {
495        source: BoxedError,
496        clean_poisons: bool,
497    },
498
499    #[snafu(display("Abort procedure"))]
500    AbortProcedure {
501        #[snafu(implicit)]
502        location: Location,
503        source: BoxedError,
504        clean_poisons: bool,
505    },
506
507    #[snafu(display(
508        "Failed to encode a wal options to json string, wal_options: {:?}",
509        wal_options
510    ))]
511    EncodeWalOptions {
512        wal_options: WalOptions,
513        #[snafu(source)]
514        error: serde_json::Error,
515        #[snafu(implicit)]
516        location: Location,
517    },
518
519    #[snafu(display("Invalid number of topics {}", num_topics))]
520    InvalidNumTopics {
521        num_topics: usize,
522        #[snafu(implicit)]
523        location: Location,
524    },
525
526    #[snafu(display(
527        "Failed to build a Kafka client, broker endpoints: {:?}",
528        broker_endpoints
529    ))]
530    BuildKafkaClient {
531        broker_endpoints: Vec<String>,
532        #[snafu(implicit)]
533        location: Location,
534        #[snafu(source)]
535        error: rskafka::client::error::Error,
536    },
537
538    #[snafu(display("Failed to create TLS Config"))]
539    TlsConfig {
540        #[snafu(implicit)]
541        location: Location,
542        source: common_wal::error::Error,
543    },
544
545    #[snafu(display("Failed to build a Kafka controller client"))]
546    BuildKafkaCtrlClient {
547        #[snafu(implicit)]
548        location: Location,
549        #[snafu(source)]
550        error: rskafka::client::error::Error,
551    },
552
553    #[snafu(display(
554        "Failed to get a Kafka partition client, topic: {}, partition: {}",
555        topic,
556        partition
557    ))]
558    KafkaPartitionClient {
559        topic: String,
560        partition: i32,
561        #[snafu(implicit)]
562        location: Location,
563        #[snafu(source)]
564        error: rskafka::client::error::Error,
565    },
566
567    #[snafu(display(
568        "Failed to get offset from Kafka, topic: {}, partition: {}",
569        topic,
570        partition
571    ))]
572    KafkaGetOffset {
573        topic: String,
574        partition: i32,
575        #[snafu(implicit)]
576        location: Location,
577        #[snafu(source)]
578        error: rskafka::client::error::Error,
579    },
580
581    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
582    ProduceRecord {
583        topic: String,
584        #[snafu(implicit)]
585        location: Location,
586        #[snafu(source)]
587        error: rskafka::client::error::Error,
588    },
589
590    #[snafu(display("Failed to create a Kafka wal topic"))]
591    CreateKafkaWalTopic {
592        #[snafu(implicit)]
593        location: Location,
594        #[snafu(source)]
595        error: rskafka::client::error::Error,
596    },
597
598    #[snafu(display("The topic pool is empty"))]
599    EmptyTopicPool {
600        #[snafu(implicit)]
601        location: Location,
602    },
603
604    #[snafu(display("Unexpected table route type: {}", err_msg))]
605    UnexpectedLogicalRouteTable {
606        #[snafu(implicit)]
607        location: Location,
608        err_msg: String,
609    },
610
611    #[snafu(display("The tasks of {} cannot be empty", name))]
612    EmptyDdlTasks {
613        name: String,
614        #[snafu(implicit)]
615        location: Location,
616    },
617
618    #[snafu(display("Metadata corruption: {}", err_msg))]
619    MetadataCorruption {
620        err_msg: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
626    AlterLogicalTablesInvalidArguments {
627        err_msg: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
633    CreateLogicalTablesInvalidArguments {
634        err_msg: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Invalid node info key: {}", key))]
640    InvalidNodeInfoKey {
641        key: String,
642        #[snafu(implicit)]
643        location: Location,
644    },
645
646    #[snafu(display("Invalid node stat key: {}", key))]
647    InvalidStatKey {
648        key: String,
649        #[snafu(implicit)]
650        location: Location,
651    },
652
653    #[snafu(display("Failed to parse number: {}", err_msg))]
654    ParseNum {
655        err_msg: String,
656        #[snafu(source)]
657        error: std::num::ParseIntError,
658        #[snafu(implicit)]
659        location: Location,
660    },
661
662    #[snafu(display("Invalid role: {}", role))]
663    InvalidRole {
664        role: i32,
665        #[snafu(implicit)]
666        location: Location,
667    },
668
669    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
670    InvalidSetDatabaseOption {
671        key: String,
672        value: String,
673        #[snafu(implicit)]
674        location: Location,
675    },
676
677    #[snafu(display("Invalid unset database option, key: {}", key))]
678    InvalidUnsetDatabaseOption {
679        key: String,
680        #[snafu(implicit)]
681        location: Location,
682    },
683
684    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
685    MismatchPrefix {
686        prefix: String,
687        key: String,
688        #[snafu(implicit)]
689        location: Location,
690    },
691
692    #[snafu(display("Failed to move values: {err_msg}"))]
693    MoveValues {
694        err_msg: String,
695        #[snafu(implicit)]
696        location: Location,
697    },
698
699    #[snafu(display("Failed to parse {} from utf8", name))]
700    FromUtf8 {
701        name: String,
702        #[snafu(source)]
703        error: std::string::FromUtf8Error,
704        #[snafu(implicit)]
705        location: Location,
706    },
707
708    #[snafu(display("Value not exists"))]
709    ValueNotExist {
710        #[snafu(implicit)]
711        location: Location,
712    },
713
714    #[snafu(display("Failed to get cache"))]
715    GetCache { source: Arc<Error> },
716
717    #[cfg(feature = "pg_kvbackend")]
718    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
719    PostgresExecution {
720        sql: String,
721        #[snafu(source)]
722        error: tokio_postgres::Error,
723        #[snafu(implicit)]
724        location: Location,
725    },
726
727    #[cfg(feature = "pg_kvbackend")]
728    #[snafu(display("Failed to create connection pool for Postgres"))]
729    CreatePostgresPool {
730        #[snafu(source)]
731        error: deadpool_postgres::CreatePoolError,
732        #[snafu(implicit)]
733        location: Location,
734    },
735
736    #[cfg(feature = "pg_kvbackend")]
737    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
738    GetPostgresConnection {
739        reason: String,
740        #[snafu(implicit)]
741        location: Location,
742    },
743
744    #[cfg(feature = "pg_kvbackend")]
745    #[snafu(display("Failed to {} Postgres transaction", operation))]
746    PostgresTransaction {
747        #[snafu(source)]
748        error: tokio_postgres::Error,
749        #[snafu(implicit)]
750        location: Location,
751        operation: String,
752    },
753
754    #[cfg(feature = "pg_kvbackend")]
755    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
756    PostgresTlsConfig {
757        reason: String,
758        #[snafu(implicit)]
759        location: Location,
760    },
761
762    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
763    LoadTlsCertificate {
764        path: String,
765        #[snafu(source)]
766        error: std::io::Error,
767        #[snafu(implicit)]
768        location: Location,
769    },
770
771    #[cfg(feature = "pg_kvbackend")]
772    #[snafu(display("Invalid TLS configuration: {}", reason))]
773    InvalidTlsConfig {
774        reason: String,
775        #[snafu(implicit)]
776        location: Location,
777    },
778
779    #[cfg(feature = "mysql_kvbackend")]
780    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
781    MySqlExecution {
782        sql: String,
783        #[snafu(source)]
784        error: sqlx::Error,
785        #[snafu(implicit)]
786        location: Location,
787    },
788
789    #[cfg(feature = "mysql_kvbackend")]
790    #[snafu(display("Failed to create connection pool for MySql"))]
791    CreateMySqlPool {
792        #[snafu(source)]
793        error: sqlx::Error,
794        #[snafu(implicit)]
795        location: Location,
796    },
797
798    #[cfg(feature = "mysql_kvbackend")]
799    #[snafu(display("Failed to {} MySql transaction", operation))]
800    MySqlTransaction {
801        #[snafu(source)]
802        error: sqlx::Error,
803        #[snafu(implicit)]
804        location: Location,
805        operation: String,
806    },
807
808    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
809    #[snafu(display("Rds transaction retry failed"))]
810    RdsTransactionRetryFailed {
811        #[snafu(implicit)]
812        location: Location,
813    },
814
815    #[snafu(display(
816        "Datanode table info not found, table id: {}, datanode id: {}",
817        table_id,
818        datanode_id
819    ))]
820    DatanodeTableInfoNotFound {
821        datanode_id: DatanodeId,
822        table_id: TableId,
823        #[snafu(implicit)]
824        location: Location,
825    },
826
827    #[snafu(display("Invalid topic name prefix: {}", prefix))]
828    InvalidTopicNamePrefix {
829        prefix: String,
830        #[snafu(implicit)]
831        location: Location,
832    },
833
834    #[snafu(display("Failed to parse wal options: {}", wal_options))]
835    ParseWalOptions {
836        wal_options: String,
837        #[snafu(implicit)]
838        location: Location,
839        #[snafu(source)]
840        error: serde_json::Error,
841    },
842
843    #[snafu(display("No leader found for table_id: {}", table_id))]
844    NoLeader {
845        table_id: TableId,
846        #[snafu(implicit)]
847        location: Location,
848    },
849
850    #[snafu(display(
851        "Procedure poison key already exists with a different value, key: {}, value: {}",
852        key,
853        value
854    ))]
855    ProcedurePoisonConflict {
856        key: String,
857        value: String,
858        #[snafu(implicit)]
859        location: Location,
860    },
861
862    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
863    PutPoison {
864        #[snafu(implicit)]
865        location: Location,
866        #[snafu(source)]
867        source: common_procedure::error::Error,
868    },
869
870    #[snafu(display("Invalid file path: {}", file_path))]
871    InvalidFilePath {
872        #[snafu(implicit)]
873        location: Location,
874        file_path: String,
875    },
876
877    #[snafu(display("Failed to serialize flexbuffers"))]
878    SerializeFlexbuffers {
879        #[snafu(implicit)]
880        location: Location,
881        #[snafu(source)]
882        error: flexbuffers::SerializationError,
883    },
884
885    #[snafu(display("Failed to deserialize flexbuffers"))]
886    DeserializeFlexbuffers {
887        #[snafu(implicit)]
888        location: Location,
889        #[snafu(source)]
890        error: flexbuffers::DeserializationError,
891    },
892
893    #[snafu(display("Failed to read flexbuffers"))]
894    ReadFlexbuffers {
895        #[snafu(implicit)]
896        location: Location,
897        #[snafu(source)]
898        error: flexbuffers::ReaderError,
899    },
900
901    #[snafu(display("Invalid file name: {}", reason))]
902    InvalidFileName {
903        #[snafu(implicit)]
904        location: Location,
905        reason: String,
906    },
907
908    #[snafu(display("Invalid file extension: {}", reason))]
909    InvalidFileExtension {
910        #[snafu(implicit)]
911        location: Location,
912        reason: String,
913    },
914
915    #[snafu(display("Failed to write object, file path: {}", file_path))]
916    WriteObject {
917        #[snafu(implicit)]
918        location: Location,
919        file_path: String,
920        #[snafu(source)]
921        error: object_store::Error,
922    },
923
924    #[snafu(display("Failed to read object, file path: {}", file_path))]
925    ReadObject {
926        #[snafu(implicit)]
927        location: Location,
928        file_path: String,
929        #[snafu(source)]
930        error: object_store::Error,
931    },
932
933    #[snafu(display("Missing column ids"))]
934    MissingColumnIds {
935        #[snafu(implicit)]
936        location: Location,
937    },
938
939    #[snafu(display(
940        "Missing column in column metadata: {}, table: {}, table_id: {}",
941        column_name,
942        table_name,
943        table_id,
944    ))]
945    MissingColumnInColumnMetadata {
946        column_name: String,
947        #[snafu(implicit)]
948        location: Location,
949        table_name: String,
950        table_id: TableId,
951    },
952
953    #[snafu(display(
954        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
955        column_name,
956        column_id,
957        table_name,
958        table_id,
959    ))]
960    MismatchColumnId {
961        column_name: String,
962        column_id: u32,
963        #[snafu(implicit)]
964        location: Location,
965        table_name: String,
966        table_id: TableId,
967    },
968
969    #[snafu(display("Failed to convert column def, column: {}", column))]
970    ConvertColumnDef {
971        column: String,
972        #[snafu(implicit)]
973        location: Location,
974        source: api::error::Error,
975    },
976
977    #[snafu(display("Failed to convert time ranges"))]
978    ConvertTimeRanges {
979        #[snafu(implicit)]
980        location: Location,
981        source: api::error::Error,
982    },
983
984    #[snafu(display(
985        "Column metadata inconsistencies found in table: {}, table_id: {}",
986        table_name,
987        table_id
988    ))]
989    ColumnMetadataConflicts {
990        table_name: String,
991        table_id: TableId,
992    },
993
994    #[snafu(display(
995        "Column not found in column metadata, column_name: {}, column_id: {}",
996        column_name,
997        column_id
998    ))]
999    ColumnNotFound { column_name: String, column_id: u32 },
1000
1001    #[snafu(display(
1002        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1003        column_name,
1004        expected_column_id,
1005        actual_column_id
1006    ))]
1007    ColumnIdMismatch {
1008        column_name: String,
1009        expected_column_id: u32,
1010        actual_column_id: u32,
1011    },
1012
1013    #[snafu(display(
1014        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1015        expected_column_name,
1016        expected_column_id,
1017        actual_column_name,
1018        actual_column_id,
1019    ))]
1020    TimestampMismatch {
1021        expected_column_name: String,
1022        expected_column_id: u32,
1023        actual_column_name: String,
1024        actual_column_id: u32,
1025    },
1026
1027    #[cfg(feature = "enterprise")]
1028    #[snafu(display("Too large duration"))]
1029    TooLargeDuration {
1030        #[snafu(source)]
1031        error: prost_types::DurationError,
1032        #[snafu(implicit)]
1033        location: Location,
1034    },
1035
1036    #[cfg(feature = "enterprise")]
1037    #[snafu(display("Negative duration"))]
1038    NegativeDuration {
1039        #[snafu(source)]
1040        error: prost_types::DurationError,
1041        #[snafu(implicit)]
1042        location: Location,
1043    },
1044
1045    #[cfg(feature = "enterprise")]
1046    #[snafu(display("Missing interval field"))]
1047    MissingInterval {
1048        #[snafu(implicit)]
1049        location: Location,
1050    },
1051}
1052
1053pub type Result<T> = std::result::Result<T, Error>;
1054
1055impl ErrorExt for Error {
1056    fn status_code(&self) -> StatusCode {
1057        use Error::*;
1058        match self {
1059            IllegalServerState { .. }
1060            | EtcdTxnOpResponse { .. }
1061            | EtcdFailed { .. }
1062            | EtcdTxnFailed { .. }
1063            | ConnectEtcd { .. }
1064            | MoveValues { .. }
1065            | GetCache { .. }
1066            | SerializeToJson { .. }
1067            | DeserializeFromJson { .. } => StatusCode::Internal,
1068
1069            NoLeader { .. } => StatusCode::TableUnavailable,
1070            ValueNotExist { .. }
1071            | ProcedurePoisonConflict { .. }
1072            | ProcedureStateReceiverNotFound { .. }
1073            | MissingColumnIds { .. }
1074            | MissingColumnInColumnMetadata { .. }
1075            | MismatchColumnId { .. }
1076            | ColumnMetadataConflicts { .. }
1077            | ColumnNotFound { .. }
1078            | ColumnIdMismatch { .. }
1079            | TimestampMismatch { .. } => StatusCode::Unexpected,
1080
1081            Unsupported { .. } => StatusCode::Unsupported,
1082            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1083
1084            SerdeJson { .. }
1085            | ParseOption { .. }
1086            | RouteInfoCorrupted { .. }
1087            | InvalidProtoMsg { .. }
1088            | InvalidMetadata { .. }
1089            | Unexpected { .. }
1090            | TableInfoNotFound { .. }
1091            | NextSequence { .. }
1092            | UnexpectedSequenceValue { .. }
1093            | InvalidHeartbeatResponse { .. }
1094            | EncodeJson { .. }
1095            | DecodeJson { .. }
1096            | PayloadNotExist { .. }
1097            | ConvertRawKey { .. }
1098            | DecodeProto { .. }
1099            | BuildTableMeta { .. }
1100            | TableRouteNotFound { .. }
1101            | TableRepartNotFound { .. }
1102            | RegionOperatingRace { .. }
1103            | EncodeWalOptions { .. }
1104            | BuildKafkaClient { .. }
1105            | BuildKafkaCtrlClient { .. }
1106            | KafkaPartitionClient { .. }
1107            | ProduceRecord { .. }
1108            | CreateKafkaWalTopic { .. }
1109            | EmptyTopicPool { .. }
1110            | UnexpectedLogicalRouteTable { .. }
1111            | ProcedureOutput { .. }
1112            | FromUtf8 { .. }
1113            | MetadataCorruption { .. }
1114            | ParseWalOptions { .. }
1115            | KafkaGetOffset { .. }
1116            | ReadFlexbuffers { .. }
1117            | SerializeFlexbuffers { .. }
1118            | DeserializeFlexbuffers { .. }
1119            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1120
1121            GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1122
1123            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1124
1125            ProcedureNotFound { .. }
1126            | InvalidViewInfo { .. }
1127            | PrimaryKeyNotFound { .. }
1128            | EmptyKey { .. }
1129            | AlterLogicalTablesInvalidArguments { .. }
1130            | CreateLogicalTablesInvalidArguments { .. }
1131            | MismatchPrefix { .. }
1132            | TlsConfig { .. }
1133            | InvalidSetDatabaseOption { .. }
1134            | InvalidUnsetDatabaseOption { .. }
1135            | InvalidTopicNamePrefix { .. }
1136            | InvalidFileExtension { .. }
1137            | InvalidFileName { .. }
1138            | InvalidFlowRequestBody { .. }
1139            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1140
1141            #[cfg(feature = "enterprise")]
1142            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1143                StatusCode::InvalidArguments
1144            }
1145
1146            FlowNotFound { .. } => StatusCode::FlowNotFound,
1147            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1148            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1149
1150            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1151                StatusCode::TableNotFound
1152            }
1153            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1154
1155            SubmitProcedure { source, .. }
1156            | QueryProcedure { source, .. }
1157            | WaitProcedure { source, .. }
1158            | StartProcedureManager { source, .. }
1159            | StopProcedureManager { source, .. } => source.status_code(),
1160            RegisterProcedureLoader { source, .. } => source.status_code(),
1161            External { source, .. } => source.status_code(),
1162            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1163            OperateDatanode { source, .. } => source.status_code(),
1164            Table { source, .. } => source.status_code(),
1165            RetryLater { source, .. } => source.status_code(),
1166            AbortProcedure { source, .. } => source.status_code(),
1167            ConvertAlterTableRequest { source, .. } => source.status_code(),
1168            PutPoison { source, .. } => source.status_code(),
1169            ConvertColumnDef { source, .. } => source.status_code(),
1170            ProcedureStateReceiver { source, .. } => source.status_code(),
1171            RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1172            CreateRepartitionProcedure { source, .. } => source.status_code(),
1173
1174            ParseProcedureId { .. }
1175            | InvalidNumTopics { .. }
1176            | SchemaNotFound { .. }
1177            | CatalogNotFound { .. }
1178            | InvalidNodeInfoKey { .. }
1179            | InvalidStatKey { .. }
1180            | ParseNum { .. }
1181            | InvalidRole { .. }
1182            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1183
1184            LoadTlsCertificate { .. } => StatusCode::Internal,
1185
1186            #[cfg(feature = "pg_kvbackend")]
1187            PostgresExecution { .. }
1188            | CreatePostgresPool { .. }
1189            | GetPostgresConnection { .. }
1190            | PostgresTransaction { .. }
1191            | PostgresTlsConfig { .. }
1192            | InvalidTlsConfig { .. } => StatusCode::Internal,
1193            #[cfg(feature = "mysql_kvbackend")]
1194            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1195                StatusCode::Internal
1196            }
1197            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1198            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1199            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1200        }
1201    }
1202
1203    fn as_any(&self) -> &dyn std::any::Any {
1204        self
1205    }
1206}
1207
1208impl Error {
1209    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1210    /// Check if the error is a serialization error.
1211    pub fn is_serialization_error(&self) -> bool {
1212        match self {
1213            #[cfg(feature = "pg_kvbackend")]
1214            Error::PostgresTransaction { error, .. } => {
1215                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1216            }
1217            #[cfg(feature = "pg_kvbackend")]
1218            Error::PostgresExecution { error, .. } => {
1219                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1220            }
1221            #[cfg(feature = "mysql_kvbackend")]
1222            Error::MySqlExecution {
1223                error: sqlx::Error::Database(database_error),
1224                ..
1225            } => {
1226                matches!(
1227                    database_error.message(),
1228                    "Deadlock found when trying to get lock; try restarting transaction"
1229                        | "can't serialize access for this transaction"
1230                )
1231            }
1232            _ => false,
1233        }
1234    }
1235
1236    /// Creates a new [Error::RetryLater] error from source `err`.
1237    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1238        Error::RetryLater {
1239            source: BoxedError::new(err),
1240            clean_poisons: false,
1241        }
1242    }
1243
1244    /// Determine whether it is a retry later type through [StatusCode]
1245    pub fn is_retry_later(&self) -> bool {
1246        matches!(self, Error::RetryLater { .. })
1247    }
1248
1249    /// Determine whether it needs to clean poisons.
1250    pub fn need_clean_poisons(&self) -> bool {
1251        matches!(
1252            self,
1253            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1254        ) || matches!(
1255            self,
1256            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1257        )
1258    }
1259
1260    /// Returns true if the response exceeds the size limit.
1261    pub fn is_exceeded_size_limit(&self) -> bool {
1262        match self {
1263            Error::EtcdFailed {
1264                error: etcd_client::Error::GRpcStatus(status),
1265                ..
1266            } => status.code() == tonic::Code::OutOfRange,
1267            Error::ResponseExceededSizeLimit { .. } => true,
1268            _ => false,
1269        }
1270    }
1271}