1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_wal::options::WalOptions;
22use serde_json::error::Error as JsonError;
23use snafu::{Location, Snafu};
24use store_api::storage::RegionId;
25use table::metadata::TableId;
26
27use crate::peer::Peer;
28use crate::DatanodeId;
29
30#[derive(Snafu)]
31#[snafu(visibility(pub))]
32#[stack_trace_debug]
33pub enum Error {
34 #[snafu(display("Empty key is not allowed"))]
35 EmptyKey {
36 #[snafu(implicit)]
37 location: Location,
38 },
39
40 #[snafu(display(
41 "Another procedure is operating the region: {} on peer: {}",
42 region_id,
43 peer_id
44 ))]
45 RegionOperatingRace {
46 #[snafu(implicit)]
47 location: Location,
48 peer_id: DatanodeId,
49 region_id: RegionId,
50 },
51
52 #[snafu(display("Failed to connect to Etcd"))]
53 ConnectEtcd {
54 #[snafu(source)]
55 error: etcd_client::Error,
56 #[snafu(implicit)]
57 location: Location,
58 },
59
60 #[snafu(display("Failed to execute via Etcd"))]
61 EtcdFailed {
62 #[snafu(source)]
63 error: etcd_client::Error,
64 #[snafu(implicit)]
65 location: Location,
66 },
67
68 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
69 EtcdTxnFailed {
70 max_operations: usize,
71 #[snafu(source)]
72 error: etcd_client::Error,
73 #[snafu(implicit)]
74 location: Location,
75 },
76
77 #[snafu(display("Failed to get sequence: {}", err_msg))]
78 NextSequence {
79 err_msg: String,
80 #[snafu(implicit)]
81 location: Location,
82 },
83
84 #[snafu(display("Unexpected sequence value: {}", err_msg))]
85 UnexpectedSequenceValue {
86 err_msg: String,
87 #[snafu(implicit)]
88 location: Location,
89 },
90
91 #[snafu(display("Table info not found: {}", table))]
92 TableInfoNotFound {
93 table: String,
94 #[snafu(implicit)]
95 location: Location,
96 },
97
98 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
99 RegisterProcedureLoader {
100 type_name: String,
101 #[snafu(implicit)]
102 location: Location,
103 source: common_procedure::error::Error,
104 },
105
106 #[snafu(display("Failed to submit procedure"))]
107 SubmitProcedure {
108 #[snafu(implicit)]
109 location: Location,
110 source: common_procedure::Error,
111 },
112
113 #[snafu(display("Failed to query procedure"))]
114 QueryProcedure {
115 #[snafu(implicit)]
116 location: Location,
117 source: common_procedure::Error,
118 },
119
120 #[snafu(display("Procedure not found: {pid}"))]
121 ProcedureNotFound {
122 #[snafu(implicit)]
123 location: Location,
124 pid: String,
125 },
126
127 #[snafu(display("Failed to parse procedure id: {key}"))]
128 ParseProcedureId {
129 #[snafu(implicit)]
130 location: Location,
131 key: String,
132 #[snafu(source)]
133 error: common_procedure::ParseIdError,
134 },
135
136 #[snafu(display("Unsupported operation {}", operation))]
137 Unsupported {
138 operation: String,
139 #[snafu(implicit)]
140 location: Location,
141 },
142
143 #[snafu(display("Failed to wait procedure done"))]
144 WaitProcedure {
145 #[snafu(implicit)]
146 location: Location,
147 source: common_procedure::Error,
148 },
149
150 #[snafu(display("Failed to start procedure manager"))]
151 StartProcedureManager {
152 #[snafu(implicit)]
153 location: Location,
154 source: common_procedure::Error,
155 },
156
157 #[snafu(display("Failed to stop procedure manager"))]
158 StopProcedureManager {
159 #[snafu(implicit)]
160 location: Location,
161 source: common_procedure::Error,
162 },
163
164 #[snafu(display(
165 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
166 ))]
167 ProcedureOutput {
168 procedure_id: String,
169 err_msg: String,
170 #[snafu(implicit)]
171 location: Location,
172 },
173
174 #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
175 ConvertRawTableInfo {
176 #[snafu(implicit)]
177 location: Location,
178 source: datatypes::Error,
179 },
180
181 #[snafu(display("Primary key '{key}' not found when creating region request"))]
182 PrimaryKeyNotFound {
183 key: String,
184 #[snafu(implicit)]
185 location: Location,
186 },
187
188 #[snafu(display("Failed to build table meta for table: {}", table_name))]
189 BuildTableMeta {
190 table_name: String,
191 #[snafu(source)]
192 error: table::metadata::TableMetaBuilderError,
193 #[snafu(implicit)]
194 location: Location,
195 },
196
197 #[snafu(display("Table occurs error"))]
198 Table {
199 #[snafu(implicit)]
200 location: Location,
201 source: table::error::Error,
202 },
203
204 #[snafu(display("Failed to find table route for table id {}", table_id))]
205 TableRouteNotFound {
206 table_id: TableId,
207 #[snafu(implicit)]
208 location: Location,
209 },
210
211 #[snafu(display("Failed to decode protobuf"))]
212 DecodeProto {
213 #[snafu(implicit)]
214 location: Location,
215 #[snafu(source)]
216 error: prost::DecodeError,
217 },
218
219 #[snafu(display("Failed to encode object into json"))]
220 EncodeJson {
221 #[snafu(implicit)]
222 location: Location,
223 #[snafu(source)]
224 error: JsonError,
225 },
226
227 #[snafu(display("Failed to decode object from json"))]
228 DecodeJson {
229 #[snafu(implicit)]
230 location: Location,
231 #[snafu(source)]
232 error: JsonError,
233 },
234
235 #[snafu(display("Failed to serialize to json: {}", input))]
236 SerializeToJson {
237 input: String,
238 #[snafu(source)]
239 error: serde_json::error::Error,
240 #[snafu(implicit)]
241 location: Location,
242 },
243
244 #[snafu(display("Failed to deserialize from json: {}", input))]
245 DeserializeFromJson {
246 input: String,
247 #[snafu(source)]
248 error: serde_json::error::Error,
249 #[snafu(implicit)]
250 location: Location,
251 },
252
253 #[snafu(display("Payload not exist"))]
254 PayloadNotExist {
255 #[snafu(implicit)]
256 location: Location,
257 },
258
259 #[snafu(display("Failed to send message: {err_msg}"))]
260 SendMessage {
261 err_msg: String,
262 #[snafu(implicit)]
263 location: Location,
264 },
265
266 #[snafu(display("Failed to serde json"))]
267 SerdeJson {
268 #[snafu(source)]
269 error: serde_json::error::Error,
270 #[snafu(implicit)]
271 location: Location,
272 },
273
274 #[snafu(display("Failed to parse value {} into key {}", value, key))]
275 ParseOption {
276 key: String,
277 value: String,
278 #[snafu(implicit)]
279 location: Location,
280 },
281
282 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
283 RouteInfoCorrupted {
284 err_msg: String,
285 #[snafu(implicit)]
286 location: Location,
287 },
288
289 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
290 IllegalServerState {
291 code: i32,
292 err_msg: String,
293 #[snafu(implicit)]
294 location: Location,
295 },
296
297 #[snafu(display("Failed to convert alter table request"))]
298 ConvertAlterTableRequest {
299 source: common_grpc_expr::error::Error,
300 #[snafu(implicit)]
301 location: Location,
302 },
303
304 #[snafu(display("Invalid protobuf message: {err_msg}"))]
305 InvalidProtoMsg {
306 err_msg: String,
307 #[snafu(implicit)]
308 location: Location,
309 },
310
311 #[snafu(display("Unexpected: {err_msg}"))]
312 Unexpected {
313 err_msg: String,
314 #[snafu(implicit)]
315 location: Location,
316 },
317
318 #[snafu(display("Table already exists, table: {}", table_name))]
319 TableAlreadyExists {
320 table_name: String,
321 #[snafu(implicit)]
322 location: Location,
323 },
324
325 #[snafu(display("View already exists, view: {}", view_name))]
326 ViewAlreadyExists {
327 view_name: String,
328 #[snafu(implicit)]
329 location: Location,
330 },
331
332 #[snafu(display("Flow already exists: {}", flow_name))]
333 FlowAlreadyExists {
334 flow_name: String,
335 #[snafu(implicit)]
336 location: Location,
337 },
338
339 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
340 SchemaAlreadyExists {
341 catalog: String,
342 schema: String,
343 #[snafu(implicit)]
344 location: Location,
345 },
346
347 #[snafu(display("Failed to convert raw key to str"))]
348 ConvertRawKey {
349 #[snafu(implicit)]
350 location: Location,
351 #[snafu(source)]
352 error: Utf8Error,
353 },
354
355 #[snafu(display("Table not found: '{}'", table_name))]
356 TableNotFound {
357 table_name: String,
358 #[snafu(implicit)]
359 location: Location,
360 },
361
362 #[snafu(display("View not found: '{}'", view_name))]
363 ViewNotFound {
364 view_name: String,
365 #[snafu(implicit)]
366 location: Location,
367 },
368
369 #[snafu(display("Flow not found: '{}'", flow_name))]
370 FlowNotFound {
371 flow_name: String,
372 #[snafu(implicit)]
373 location: Location,
374 },
375
376 #[snafu(display("Flow route not found: '{}'", flow_name))]
377 FlowRouteNotFound {
378 flow_name: String,
379 #[snafu(implicit)]
380 location: Location,
381 },
382
383 #[snafu(display("Schema nod found, schema: {}", table_schema))]
384 SchemaNotFound {
385 table_schema: String,
386 #[snafu(implicit)]
387 location: Location,
388 },
389
390 #[snafu(display("Invalid metadata, err: {}", err_msg))]
391 InvalidMetadata {
392 err_msg: String,
393 #[snafu(implicit)]
394 location: Location,
395 },
396
397 #[snafu(display("Invalid view info, err: {}", err_msg))]
398 InvalidViewInfo {
399 err_msg: String,
400 #[snafu(implicit)]
401 location: Location,
402 },
403
404 #[snafu(display("Invalid flow request body: {:?}", body))]
405 InvalidFlowRequestBody {
406 body: Box<Option<api::v1::flow::flow_request::Body>>,
407 #[snafu(implicit)]
408 location: Location,
409 },
410
411 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
412 GetKvCache { err_msg: String },
413
414 #[snafu(display("Get null from cache, key: {}", key))]
415 CacheNotGet {
416 key: String,
417 #[snafu(implicit)]
418 location: Location,
419 },
420
421 #[snafu(display("Etcd txn error: {err_msg}"))]
422 EtcdTxnOpResponse {
423 err_msg: String,
424 #[snafu(implicit)]
425 location: Location,
426 },
427
428 #[snafu(display("External error"))]
429 External {
430 #[snafu(implicit)]
431 location: Location,
432 source: BoxedError,
433 },
434
435 #[snafu(display("The response exceeded size limit"))]
436 ResponseExceededSizeLimit {
437 #[snafu(implicit)]
438 location: Location,
439 source: BoxedError,
440 },
441
442 #[snafu(display("Invalid heartbeat response"))]
443 InvalidHeartbeatResponse {
444 #[snafu(implicit)]
445 location: Location,
446 },
447
448 #[snafu(display("Failed to operate on datanode: {}", peer))]
449 OperateDatanode {
450 #[snafu(implicit)]
451 location: Location,
452 peer: Peer,
453 source: BoxedError,
454 },
455
456 #[snafu(display("Retry later"))]
457 RetryLater {
458 source: BoxedError,
459 clean_poisons: bool,
460 },
461
462 #[snafu(display("Abort procedure"))]
463 AbortProcedure {
464 #[snafu(implicit)]
465 location: Location,
466 source: BoxedError,
467 clean_poisons: bool,
468 },
469
470 #[snafu(display(
471 "Failed to encode a wal options to json string, wal_options: {:?}",
472 wal_options
473 ))]
474 EncodeWalOptions {
475 wal_options: WalOptions,
476 #[snafu(source)]
477 error: serde_json::Error,
478 #[snafu(implicit)]
479 location: Location,
480 },
481
482 #[snafu(display("Invalid number of topics {}", num_topics))]
483 InvalidNumTopics {
484 num_topics: usize,
485 #[snafu(implicit)]
486 location: Location,
487 },
488
489 #[snafu(display(
490 "Failed to build a Kafka client, broker endpoints: {:?}",
491 broker_endpoints
492 ))]
493 BuildKafkaClient {
494 broker_endpoints: Vec<String>,
495 #[snafu(implicit)]
496 location: Location,
497 #[snafu(source)]
498 error: rskafka::client::error::Error,
499 },
500
501 #[snafu(display("Failed to create TLS Config"))]
502 TlsConfig {
503 #[snafu(implicit)]
504 location: Location,
505 source: common_wal::error::Error,
506 },
507
508 #[snafu(display("Failed to resolve Kafka broker endpoint."))]
509 ResolveKafkaEndpoint { source: common_wal::error::Error },
510
511 #[snafu(display("Failed to build a Kafka controller client"))]
512 BuildKafkaCtrlClient {
513 #[snafu(implicit)]
514 location: Location,
515 #[snafu(source)]
516 error: rskafka::client::error::Error,
517 },
518
519 #[snafu(display(
520 "Failed to get a Kafka partition client, topic: {}, partition: {}",
521 topic,
522 partition
523 ))]
524 KafkaPartitionClient {
525 topic: String,
526 partition: i32,
527 #[snafu(implicit)]
528 location: Location,
529 #[snafu(source)]
530 error: rskafka::client::error::Error,
531 },
532
533 #[snafu(display(
534 "Failed to get offset from Kafka, topic: {}, partition: {}",
535 topic,
536 partition
537 ))]
538 KafkaGetOffset {
539 topic: String,
540 partition: i32,
541 #[snafu(implicit)]
542 location: Location,
543 #[snafu(source)]
544 error: rskafka::client::error::Error,
545 },
546
547 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
548 ProduceRecord {
549 topic: String,
550 #[snafu(implicit)]
551 location: Location,
552 #[snafu(source)]
553 error: rskafka::client::error::Error,
554 },
555
556 #[snafu(display("Failed to create a Kafka wal topic"))]
557 CreateKafkaWalTopic {
558 #[snafu(implicit)]
559 location: Location,
560 #[snafu(source)]
561 error: rskafka::client::error::Error,
562 },
563
564 #[snafu(display("The topic pool is empty"))]
565 EmptyTopicPool {
566 #[snafu(implicit)]
567 location: Location,
568 },
569
570 #[snafu(display("Unexpected table route type: {}", err_msg))]
571 UnexpectedLogicalRouteTable {
572 #[snafu(implicit)]
573 location: Location,
574 err_msg: String,
575 },
576
577 #[snafu(display("The tasks of {} cannot be empty", name))]
578 EmptyDdlTasks {
579 name: String,
580 #[snafu(implicit)]
581 location: Location,
582 },
583
584 #[snafu(display("Metadata corruption: {}", err_msg))]
585 MetadataCorruption {
586 err_msg: String,
587 #[snafu(implicit)]
588 location: Location,
589 },
590
591 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
592 AlterLogicalTablesInvalidArguments {
593 err_msg: String,
594 #[snafu(implicit)]
595 location: Location,
596 },
597
598 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
599 CreateLogicalTablesInvalidArguments {
600 err_msg: String,
601 #[snafu(implicit)]
602 location: Location,
603 },
604
605 #[snafu(display("Invalid node info key: {}", key))]
606 InvalidNodeInfoKey {
607 key: String,
608 #[snafu(implicit)]
609 location: Location,
610 },
611
612 #[snafu(display("Invalid node stat key: {}", key))]
613 InvalidStatKey {
614 key: String,
615 #[snafu(implicit)]
616 location: Location,
617 },
618
619 #[snafu(display("Failed to parse number: {}", err_msg))]
620 ParseNum {
621 err_msg: String,
622 #[snafu(source)]
623 error: std::num::ParseIntError,
624 #[snafu(implicit)]
625 location: Location,
626 },
627
628 #[snafu(display("Invalid role: {}", role))]
629 InvalidRole {
630 role: i32,
631 #[snafu(implicit)]
632 location: Location,
633 },
634
635 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
636 InvalidSetDatabaseOption {
637 key: String,
638 value: String,
639 #[snafu(implicit)]
640 location: Location,
641 },
642
643 #[snafu(display("Invalid unset database option, key: {}", key))]
644 InvalidUnsetDatabaseOption {
645 key: String,
646 #[snafu(implicit)]
647 location: Location,
648 },
649
650 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
651 MismatchPrefix {
652 prefix: String,
653 key: String,
654 #[snafu(implicit)]
655 location: Location,
656 },
657
658 #[snafu(display("Failed to move values: {err_msg}"))]
659 MoveValues {
660 err_msg: String,
661 #[snafu(implicit)]
662 location: Location,
663 },
664
665 #[snafu(display("Failed to parse {} from utf8", name))]
666 FromUtf8 {
667 name: String,
668 #[snafu(source)]
669 error: std::string::FromUtf8Error,
670 #[snafu(implicit)]
671 location: Location,
672 },
673
674 #[snafu(display("Value not exists"))]
675 ValueNotExist {
676 #[snafu(implicit)]
677 location: Location,
678 },
679
680 #[snafu(display("Failed to get cache"))]
681 GetCache { source: Arc<Error> },
682
683 #[cfg(feature = "pg_kvbackend")]
684 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
685 PostgresExecution {
686 sql: String,
687 #[snafu(source)]
688 error: tokio_postgres::Error,
689 #[snafu(implicit)]
690 location: Location,
691 },
692
693 #[cfg(feature = "pg_kvbackend")]
694 #[snafu(display("Failed to create connection pool for Postgres"))]
695 CreatePostgresPool {
696 #[snafu(source)]
697 error: deadpool_postgres::CreatePoolError,
698 #[snafu(implicit)]
699 location: Location,
700 },
701
702 #[cfg(feature = "pg_kvbackend")]
703 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
704 GetPostgresConnection {
705 reason: String,
706 #[snafu(implicit)]
707 location: Location,
708 },
709
710 #[cfg(feature = "pg_kvbackend")]
711 #[snafu(display("Failed to {} Postgres transaction", operation))]
712 PostgresTransaction {
713 #[snafu(source)]
714 error: tokio_postgres::Error,
715 #[snafu(implicit)]
716 location: Location,
717 operation: String,
718 },
719
720 #[cfg(feature = "mysql_kvbackend")]
721 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
722 MySqlExecution {
723 sql: String,
724 #[snafu(source)]
725 error: sqlx::Error,
726 #[snafu(implicit)]
727 location: Location,
728 },
729
730 #[cfg(feature = "mysql_kvbackend")]
731 #[snafu(display("Failed to create connection pool for MySql"))]
732 CreateMySqlPool {
733 #[snafu(source)]
734 error: sqlx::Error,
735 #[snafu(implicit)]
736 location: Location,
737 },
738
739 #[cfg(feature = "mysql_kvbackend")]
740 #[snafu(display("Failed to {} MySql transaction", operation))]
741 MySqlTransaction {
742 #[snafu(source)]
743 error: sqlx::Error,
744 #[snafu(implicit)]
745 location: Location,
746 operation: String,
747 },
748
749 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
750 #[snafu(display("Rds transaction retry failed"))]
751 RdsTransactionRetryFailed {
752 #[snafu(implicit)]
753 location: Location,
754 },
755
756 #[snafu(display(
757 "Datanode table info not found, table id: {}, datanode id: {}",
758 table_id,
759 datanode_id
760 ))]
761 DatanodeTableInfoNotFound {
762 datanode_id: DatanodeId,
763 table_id: TableId,
764 #[snafu(implicit)]
765 location: Location,
766 },
767
768 #[snafu(display("Invalid topic name prefix: {}", prefix))]
769 InvalidTopicNamePrefix {
770 prefix: String,
771 #[snafu(implicit)]
772 location: Location,
773 },
774
775 #[snafu(display("Failed to parse wal options: {}", wal_options))]
776 ParseWalOptions {
777 wal_options: String,
778 #[snafu(implicit)]
779 location: Location,
780 #[snafu(source)]
781 error: serde_json::Error,
782 },
783
784 #[snafu(display("No leader found for table_id: {}", table_id))]
785 NoLeader {
786 table_id: TableId,
787 #[snafu(implicit)]
788 location: Location,
789 },
790
791 #[snafu(display(
792 "Procedure poison key already exists with a different value, key: {}, value: {}",
793 key,
794 value
795 ))]
796 ProcedurePoisonConflict {
797 key: String,
798 value: String,
799 #[snafu(implicit)]
800 location: Location,
801 },
802
803 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
804 PutPoison {
805 #[snafu(implicit)]
806 location: Location,
807 #[snafu(source)]
808 source: common_procedure::error::Error,
809 },
810
811 #[snafu(display("Failed to parse timezone"))]
812 InvalidTimeZone {
813 #[snafu(implicit)]
814 location: Location,
815 #[snafu(source)]
816 error: common_time::error::Error,
817 },
818 #[snafu(display("Invalid file path: {}", file_path))]
819 InvalidFilePath {
820 #[snafu(implicit)]
821 location: Location,
822 file_path: String,
823 },
824
825 #[snafu(display("Failed to serialize flexbuffers"))]
826 SerializeFlexbuffers {
827 #[snafu(implicit)]
828 location: Location,
829 #[snafu(source)]
830 error: flexbuffers::SerializationError,
831 },
832
833 #[snafu(display("Failed to deserialize flexbuffers"))]
834 DeserializeFlexbuffers {
835 #[snafu(implicit)]
836 location: Location,
837 #[snafu(source)]
838 error: flexbuffers::DeserializationError,
839 },
840
841 #[snafu(display("Failed to read flexbuffers"))]
842 ReadFlexbuffers {
843 #[snafu(implicit)]
844 location: Location,
845 #[snafu(source)]
846 error: flexbuffers::ReaderError,
847 },
848
849 #[snafu(display("Invalid file name: {}", reason))]
850 InvalidFileName {
851 #[snafu(implicit)]
852 location: Location,
853 reason: String,
854 },
855
856 #[snafu(display("Invalid file extension: {}", reason))]
857 InvalidFileExtension {
858 #[snafu(implicit)]
859 location: Location,
860 reason: String,
861 },
862
863 #[snafu(display("Failed to write object, file path: {}", file_path))]
864 WriteObject {
865 #[snafu(implicit)]
866 location: Location,
867 file_path: String,
868 #[snafu(source)]
869 error: object_store::Error,
870 },
871
872 #[snafu(display("Failed to read object, file path: {}", file_path))]
873 ReadObject {
874 #[snafu(implicit)]
875 location: Location,
876 file_path: String,
877 #[snafu(source)]
878 error: object_store::Error,
879 },
880}
881
882pub type Result<T> = std::result::Result<T, Error>;
883
884impl ErrorExt for Error {
885 fn status_code(&self) -> StatusCode {
886 use Error::*;
887 match self {
888 IllegalServerState { .. }
889 | EtcdTxnOpResponse { .. }
890 | EtcdFailed { .. }
891 | EtcdTxnFailed { .. }
892 | ConnectEtcd { .. }
893 | MoveValues { .. }
894 | GetCache { .. }
895 | SerializeToJson { .. }
896 | DeserializeFromJson { .. } => StatusCode::Internal,
897
898 NoLeader { .. } => StatusCode::TableUnavailable,
899 ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
900
901 Unsupported { .. } => StatusCode::Unsupported,
902 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
903
904 SerdeJson { .. }
905 | ParseOption { .. }
906 | RouteInfoCorrupted { .. }
907 | InvalidProtoMsg { .. }
908 | InvalidMetadata { .. }
909 | Unexpected { .. }
910 | TableInfoNotFound { .. }
911 | NextSequence { .. }
912 | UnexpectedSequenceValue { .. }
913 | InvalidHeartbeatResponse { .. }
914 | EncodeJson { .. }
915 | DecodeJson { .. }
916 | PayloadNotExist { .. }
917 | ConvertRawKey { .. }
918 | DecodeProto { .. }
919 | BuildTableMeta { .. }
920 | TableRouteNotFound { .. }
921 | ConvertRawTableInfo { .. }
922 | RegionOperatingRace { .. }
923 | EncodeWalOptions { .. }
924 | BuildKafkaClient { .. }
925 | BuildKafkaCtrlClient { .. }
926 | KafkaPartitionClient { .. }
927 | ResolveKafkaEndpoint { .. }
928 | ProduceRecord { .. }
929 | CreateKafkaWalTopic { .. }
930 | EmptyTopicPool { .. }
931 | UnexpectedLogicalRouteTable { .. }
932 | ProcedureOutput { .. }
933 | FromUtf8 { .. }
934 | MetadataCorruption { .. }
935 | ParseWalOptions { .. }
936 | KafkaGetOffset { .. }
937 | ReadFlexbuffers { .. }
938 | SerializeFlexbuffers { .. }
939 | DeserializeFlexbuffers { .. } => StatusCode::Unexpected,
940
941 SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
942
943 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
944
945 ProcedureNotFound { .. }
946 | InvalidViewInfo { .. }
947 | PrimaryKeyNotFound { .. }
948 | EmptyKey { .. }
949 | AlterLogicalTablesInvalidArguments { .. }
950 | CreateLogicalTablesInvalidArguments { .. }
951 | MismatchPrefix { .. }
952 | TlsConfig { .. }
953 | InvalidSetDatabaseOption { .. }
954 | InvalidUnsetDatabaseOption { .. }
955 | InvalidTopicNamePrefix { .. }
956 | InvalidTimeZone { .. }
957 | InvalidFileExtension { .. }
958 | InvalidFileName { .. }
959 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
960 InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
961
962 FlowNotFound { .. } => StatusCode::FlowNotFound,
963 FlowRouteNotFound { .. } => StatusCode::Unexpected,
964 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
965
966 ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
967 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
968
969 SubmitProcedure { source, .. }
970 | QueryProcedure { source, .. }
971 | WaitProcedure { source, .. }
972 | StartProcedureManager { source, .. }
973 | StopProcedureManager { source, .. } => source.status_code(),
974 RegisterProcedureLoader { source, .. } => source.status_code(),
975 External { source, .. } => source.status_code(),
976 ResponseExceededSizeLimit { source, .. } => source.status_code(),
977 OperateDatanode { source, .. } => source.status_code(),
978 Table { source, .. } => source.status_code(),
979 RetryLater { source, .. } => source.status_code(),
980 AbortProcedure { source, .. } => source.status_code(),
981 ConvertAlterTableRequest { source, .. } => source.status_code(),
982 PutPoison { source, .. } => source.status_code(),
983
984 ParseProcedureId { .. }
985 | InvalidNumTopics { .. }
986 | SchemaNotFound { .. }
987 | InvalidNodeInfoKey { .. }
988 | InvalidStatKey { .. }
989 | ParseNum { .. }
990 | InvalidRole { .. }
991 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
992
993 #[cfg(feature = "pg_kvbackend")]
994 PostgresExecution { .. }
995 | CreatePostgresPool { .. }
996 | GetPostgresConnection { .. }
997 | PostgresTransaction { .. } => StatusCode::Internal,
998 #[cfg(feature = "mysql_kvbackend")]
999 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1000 StatusCode::Internal
1001 }
1002 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1003 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1004 Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1005 }
1006 }
1007
1008 fn as_any(&self) -> &dyn std::any::Any {
1009 self
1010 }
1011}
1012
1013impl Error {
1014 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1015 pub fn is_serialization_error(&self) -> bool {
1017 match self {
1018 #[cfg(feature = "pg_kvbackend")]
1019 Error::PostgresTransaction { error, .. } => {
1020 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1021 }
1022 #[cfg(feature = "pg_kvbackend")]
1023 Error::PostgresExecution { error, .. } => {
1024 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1025 }
1026 #[cfg(feature = "mysql_kvbackend")]
1027 Error::MySqlExecution {
1028 error: sqlx::Error::Database(database_error),
1029 ..
1030 } => {
1031 matches!(
1032 database_error.message(),
1033 "Deadlock found when trying to get lock; try restarting transaction"
1034 | "can't serialize access for this transaction"
1035 )
1036 }
1037 _ => false,
1038 }
1039 }
1040
1041 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1043 Error::RetryLater {
1044 source: BoxedError::new(err),
1045 clean_poisons: false,
1046 }
1047 }
1048
1049 pub fn is_retry_later(&self) -> bool {
1051 matches!(self, Error::RetryLater { .. })
1052 }
1053
1054 pub fn need_clean_poisons(&self) -> bool {
1056 matches!(
1057 self,
1058 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1059 ) || matches!(
1060 self,
1061 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1062 )
1063 }
1064
1065 pub fn is_exceeded_size_limit(&self) -> bool {
1067 match self {
1068 Error::EtcdFailed {
1069 error: etcd_client::Error::GRpcStatus(status),
1070 ..
1071 } => status.code() == tonic::Code::OutOfRange,
1072 Error::ResponseExceededSizeLimit { .. } => true,
1073 _ => false,
1074 }
1075 }
1076}