1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::peer::Peer;
29use crate::DatanodeId;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to submit procedure"))]
108 SubmitProcedure {
109 #[snafu(implicit)]
110 location: Location,
111 source: common_procedure::Error,
112 },
113
114 #[snafu(display("Failed to query procedure"))]
115 QueryProcedure {
116 #[snafu(implicit)]
117 location: Location,
118 source: common_procedure::Error,
119 },
120
121 #[snafu(display("Procedure not found: {pid}"))]
122 ProcedureNotFound {
123 #[snafu(implicit)]
124 location: Location,
125 pid: String,
126 },
127
128 #[snafu(display("Failed to parse procedure id: {key}"))]
129 ParseProcedureId {
130 #[snafu(implicit)]
131 location: Location,
132 key: String,
133 #[snafu(source)]
134 error: common_procedure::ParseIdError,
135 },
136
137 #[snafu(display("Unsupported operation {}", operation))]
138 Unsupported {
139 operation: String,
140 #[snafu(implicit)]
141 location: Location,
142 },
143
144 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145 ProcedureStateReceiver {
146 procedure_id: ProcedureId,
147 #[snafu(implicit)]
148 location: Location,
149 source: common_procedure::Error,
150 },
151
152 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153 ProcedureStateReceiverNotFound {
154 procedure_id: ProcedureId,
155 #[snafu(implicit)]
156 location: Location,
157 },
158
159 #[snafu(display("Failed to wait procedure done"))]
160 WaitProcedure {
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Failed to start procedure manager"))]
167 StartProcedureManager {
168 #[snafu(implicit)]
169 location: Location,
170 source: common_procedure::Error,
171 },
172
173 #[snafu(display("Failed to stop procedure manager"))]
174 StopProcedureManager {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display(
181 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182 ))]
183 ProcedureOutput {
184 procedure_id: String,
185 err_msg: String,
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191 ConvertRawTableInfo {
192 #[snafu(implicit)]
193 location: Location,
194 source: datatypes::Error,
195 },
196
197 #[snafu(display("Primary key '{key}' not found when creating region request"))]
198 PrimaryKeyNotFound {
199 key: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Failed to build table meta for table: {}", table_name))]
205 BuildTableMeta {
206 table_name: String,
207 #[snafu(source)]
208 error: table::metadata::TableMetaBuilderError,
209 #[snafu(implicit)]
210 location: Location,
211 },
212
213 #[snafu(display("Table occurs error"))]
214 Table {
215 #[snafu(implicit)]
216 location: Location,
217 source: table::error::Error,
218 },
219
220 #[snafu(display("Failed to find table route for table id {}", table_id))]
221 TableRouteNotFound {
222 table_id: TableId,
223 #[snafu(implicit)]
224 location: Location,
225 },
226
227 #[snafu(display("Failed to decode protobuf"))]
228 DecodeProto {
229 #[snafu(implicit)]
230 location: Location,
231 #[snafu(source)]
232 error: prost::DecodeError,
233 },
234
235 #[snafu(display("Failed to encode object into json"))]
236 EncodeJson {
237 #[snafu(implicit)]
238 location: Location,
239 #[snafu(source)]
240 error: JsonError,
241 },
242
243 #[snafu(display("Failed to decode object from json"))]
244 DecodeJson {
245 #[snafu(implicit)]
246 location: Location,
247 #[snafu(source)]
248 error: JsonError,
249 },
250
251 #[snafu(display("Failed to serialize to json: {}", input))]
252 SerializeToJson {
253 input: String,
254 #[snafu(source)]
255 error: serde_json::error::Error,
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to deserialize from json: {}", input))]
261 DeserializeFromJson {
262 input: String,
263 #[snafu(source)]
264 error: serde_json::error::Error,
265 #[snafu(implicit)]
266 location: Location,
267 },
268
269 #[snafu(display("Payload not exist"))]
270 PayloadNotExist {
271 #[snafu(implicit)]
272 location: Location,
273 },
274
275 #[snafu(display("Failed to send message: {err_msg}"))]
276 SendMessage {
277 err_msg: String,
278 #[snafu(implicit)]
279 location: Location,
280 },
281
282 #[snafu(display("Failed to serde json"))]
283 SerdeJson {
284 #[snafu(source)]
285 error: serde_json::error::Error,
286 #[snafu(implicit)]
287 location: Location,
288 },
289
290 #[snafu(display("Failed to parse value {} into key {}", value, key))]
291 ParseOption {
292 key: String,
293 value: String,
294 #[snafu(implicit)]
295 location: Location,
296 },
297
298 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
299 RouteInfoCorrupted {
300 err_msg: String,
301 #[snafu(implicit)]
302 location: Location,
303 },
304
305 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
306 IllegalServerState {
307 code: i32,
308 err_msg: String,
309 #[snafu(implicit)]
310 location: Location,
311 },
312
313 #[snafu(display("Failed to convert alter table request"))]
314 ConvertAlterTableRequest {
315 source: common_grpc_expr::error::Error,
316 #[snafu(implicit)]
317 location: Location,
318 },
319
320 #[snafu(display("Invalid protobuf message: {err_msg}"))]
321 InvalidProtoMsg {
322 err_msg: String,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Unexpected: {err_msg}"))]
328 Unexpected {
329 err_msg: String,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("Table already exists, table: {}", table_name))]
335 TableAlreadyExists {
336 table_name: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("View already exists, view: {}", view_name))]
342 ViewAlreadyExists {
343 view_name: String,
344 #[snafu(implicit)]
345 location: Location,
346 },
347
348 #[snafu(display("Flow already exists: {}", flow_name))]
349 FlowAlreadyExists {
350 flow_name: String,
351 #[snafu(implicit)]
352 location: Location,
353 },
354
355 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
356 SchemaAlreadyExists {
357 catalog: String,
358 schema: String,
359 #[snafu(implicit)]
360 location: Location,
361 },
362
363 #[snafu(display("Failed to convert raw key to str"))]
364 ConvertRawKey {
365 #[snafu(implicit)]
366 location: Location,
367 #[snafu(source)]
368 error: Utf8Error,
369 },
370
371 #[snafu(display("Table not found: '{}'", table_name))]
372 TableNotFound {
373 table_name: String,
374 #[snafu(implicit)]
375 location: Location,
376 },
377
378 #[snafu(display("View not found: '{}'", view_name))]
379 ViewNotFound {
380 view_name: String,
381 #[snafu(implicit)]
382 location: Location,
383 },
384
385 #[snafu(display("Flow not found: '{}'", flow_name))]
386 FlowNotFound {
387 flow_name: String,
388 #[snafu(implicit)]
389 location: Location,
390 },
391
392 #[snafu(display("Flow route not found: '{}'", flow_name))]
393 FlowRouteNotFound {
394 flow_name: String,
395 #[snafu(implicit)]
396 location: Location,
397 },
398
399 #[snafu(display("Schema nod found, schema: {}", table_schema))]
400 SchemaNotFound {
401 table_schema: String,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Catalog not found, catalog: {}", catalog))]
407 CatalogNotFound {
408 catalog: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Invalid metadata, err: {}", err_msg))]
414 InvalidMetadata {
415 err_msg: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[snafu(display("Invalid view info, err: {}", err_msg))]
421 InvalidViewInfo {
422 err_msg: String,
423 #[snafu(implicit)]
424 location: Location,
425 },
426
427 #[snafu(display("Invalid flow request body: {:?}", body))]
428 InvalidFlowRequestBody {
429 body: Box<Option<api::v1::flow::flow_request::Body>>,
430 #[snafu(implicit)]
431 location: Location,
432 },
433
434 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
435 GetKvCache { err_msg: String },
436
437 #[snafu(display("Get null from cache, key: {}", key))]
438 CacheNotGet {
439 key: String,
440 #[snafu(implicit)]
441 location: Location,
442 },
443
444 #[snafu(display("Etcd txn error: {err_msg}"))]
445 EtcdTxnOpResponse {
446 err_msg: String,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("External error"))]
452 External {
453 #[snafu(implicit)]
454 location: Location,
455 source: BoxedError,
456 },
457
458 #[snafu(display("The response exceeded size limit"))]
459 ResponseExceededSizeLimit {
460 #[snafu(implicit)]
461 location: Location,
462 source: BoxedError,
463 },
464
465 #[snafu(display("Invalid heartbeat response"))]
466 InvalidHeartbeatResponse {
467 #[snafu(implicit)]
468 location: Location,
469 },
470
471 #[snafu(display("Failed to operate on datanode: {}", peer))]
472 OperateDatanode {
473 #[snafu(implicit)]
474 location: Location,
475 peer: Peer,
476 source: BoxedError,
477 },
478
479 #[snafu(display("Retry later"))]
480 RetryLater {
481 source: BoxedError,
482 clean_poisons: bool,
483 },
484
485 #[snafu(display("Abort procedure"))]
486 AbortProcedure {
487 #[snafu(implicit)]
488 location: Location,
489 source: BoxedError,
490 clean_poisons: bool,
491 },
492
493 #[snafu(display(
494 "Failed to encode a wal options to json string, wal_options: {:?}",
495 wal_options
496 ))]
497 EncodeWalOptions {
498 wal_options: WalOptions,
499 #[snafu(source)]
500 error: serde_json::Error,
501 #[snafu(implicit)]
502 location: Location,
503 },
504
505 #[snafu(display("Invalid number of topics {}", num_topics))]
506 InvalidNumTopics {
507 num_topics: usize,
508 #[snafu(implicit)]
509 location: Location,
510 },
511
512 #[snafu(display(
513 "Failed to build a Kafka client, broker endpoints: {:?}",
514 broker_endpoints
515 ))]
516 BuildKafkaClient {
517 broker_endpoints: Vec<String>,
518 #[snafu(implicit)]
519 location: Location,
520 #[snafu(source)]
521 error: rskafka::client::error::Error,
522 },
523
524 #[snafu(display("Failed to create TLS Config"))]
525 TlsConfig {
526 #[snafu(implicit)]
527 location: Location,
528 source: common_wal::error::Error,
529 },
530
531 #[snafu(display("Failed to resolve Kafka broker endpoint."))]
532 ResolveKafkaEndpoint { source: common_wal::error::Error },
533
534 #[snafu(display("Failed to build a Kafka controller client"))]
535 BuildKafkaCtrlClient {
536 #[snafu(implicit)]
537 location: Location,
538 #[snafu(source)]
539 error: rskafka::client::error::Error,
540 },
541
542 #[snafu(display(
543 "Failed to get a Kafka partition client, topic: {}, partition: {}",
544 topic,
545 partition
546 ))]
547 KafkaPartitionClient {
548 topic: String,
549 partition: i32,
550 #[snafu(implicit)]
551 location: Location,
552 #[snafu(source)]
553 error: rskafka::client::error::Error,
554 },
555
556 #[snafu(display(
557 "Failed to get offset from Kafka, topic: {}, partition: {}",
558 topic,
559 partition
560 ))]
561 KafkaGetOffset {
562 topic: String,
563 partition: i32,
564 #[snafu(implicit)]
565 location: Location,
566 #[snafu(source)]
567 error: rskafka::client::error::Error,
568 },
569
570 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
571 ProduceRecord {
572 topic: String,
573 #[snafu(implicit)]
574 location: Location,
575 #[snafu(source)]
576 error: rskafka::client::error::Error,
577 },
578
579 #[snafu(display("Failed to create a Kafka wal topic"))]
580 CreateKafkaWalTopic {
581 #[snafu(implicit)]
582 location: Location,
583 #[snafu(source)]
584 error: rskafka::client::error::Error,
585 },
586
587 #[snafu(display("The topic pool is empty"))]
588 EmptyTopicPool {
589 #[snafu(implicit)]
590 location: Location,
591 },
592
593 #[snafu(display("Unexpected table route type: {}", err_msg))]
594 UnexpectedLogicalRouteTable {
595 #[snafu(implicit)]
596 location: Location,
597 err_msg: String,
598 },
599
600 #[snafu(display("The tasks of {} cannot be empty", name))]
601 EmptyDdlTasks {
602 name: String,
603 #[snafu(implicit)]
604 location: Location,
605 },
606
607 #[snafu(display("Metadata corruption: {}", err_msg))]
608 MetadataCorruption {
609 err_msg: String,
610 #[snafu(implicit)]
611 location: Location,
612 },
613
614 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
615 AlterLogicalTablesInvalidArguments {
616 err_msg: String,
617 #[snafu(implicit)]
618 location: Location,
619 },
620
621 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
622 CreateLogicalTablesInvalidArguments {
623 err_msg: String,
624 #[snafu(implicit)]
625 location: Location,
626 },
627
628 #[snafu(display("Invalid node info key: {}", key))]
629 InvalidNodeInfoKey {
630 key: String,
631 #[snafu(implicit)]
632 location: Location,
633 },
634
635 #[snafu(display("Invalid node stat key: {}", key))]
636 InvalidStatKey {
637 key: String,
638 #[snafu(implicit)]
639 location: Location,
640 },
641
642 #[snafu(display("Failed to parse number: {}", err_msg))]
643 ParseNum {
644 err_msg: String,
645 #[snafu(source)]
646 error: std::num::ParseIntError,
647 #[snafu(implicit)]
648 location: Location,
649 },
650
651 #[snafu(display("Invalid role: {}", role))]
652 InvalidRole {
653 role: i32,
654 #[snafu(implicit)]
655 location: Location,
656 },
657
658 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
659 InvalidSetDatabaseOption {
660 key: String,
661 value: String,
662 #[snafu(implicit)]
663 location: Location,
664 },
665
666 #[snafu(display("Invalid unset database option, key: {}", key))]
667 InvalidUnsetDatabaseOption {
668 key: String,
669 #[snafu(implicit)]
670 location: Location,
671 },
672
673 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
674 MismatchPrefix {
675 prefix: String,
676 key: String,
677 #[snafu(implicit)]
678 location: Location,
679 },
680
681 #[snafu(display("Failed to move values: {err_msg}"))]
682 MoveValues {
683 err_msg: String,
684 #[snafu(implicit)]
685 location: Location,
686 },
687
688 #[snafu(display("Failed to parse {} from utf8", name))]
689 FromUtf8 {
690 name: String,
691 #[snafu(source)]
692 error: std::string::FromUtf8Error,
693 #[snafu(implicit)]
694 location: Location,
695 },
696
697 #[snafu(display("Value not exists"))]
698 ValueNotExist {
699 #[snafu(implicit)]
700 location: Location,
701 },
702
703 #[snafu(display("Failed to get cache"))]
704 GetCache { source: Arc<Error> },
705
706 #[cfg(feature = "pg_kvbackend")]
707 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
708 PostgresExecution {
709 sql: String,
710 #[snafu(source)]
711 error: tokio_postgres::Error,
712 #[snafu(implicit)]
713 location: Location,
714 },
715
716 #[cfg(feature = "pg_kvbackend")]
717 #[snafu(display("Failed to create connection pool for Postgres"))]
718 CreatePostgresPool {
719 #[snafu(source)]
720 error: deadpool_postgres::CreatePoolError,
721 #[snafu(implicit)]
722 location: Location,
723 },
724
725 #[cfg(feature = "pg_kvbackend")]
726 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
727 GetPostgresConnection {
728 reason: String,
729 #[snafu(implicit)]
730 location: Location,
731 },
732
733 #[cfg(feature = "pg_kvbackend")]
734 #[snafu(display("Failed to {} Postgres transaction", operation))]
735 PostgresTransaction {
736 #[snafu(source)]
737 error: tokio_postgres::Error,
738 #[snafu(implicit)]
739 location: Location,
740 operation: String,
741 },
742
743 #[cfg(feature = "pg_kvbackend")]
744 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
745 PostgresTlsConfig {
746 reason: String,
747 #[snafu(implicit)]
748 location: Location,
749 },
750
751 #[cfg(feature = "pg_kvbackend")]
752 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
753 LoadTlsCertificate {
754 path: String,
755 #[snafu(source)]
756 error: std::io::Error,
757 #[snafu(implicit)]
758 location: Location,
759 },
760
761 #[cfg(feature = "pg_kvbackend")]
762 #[snafu(display("Invalid TLS configuration: {}", reason))]
763 InvalidTlsConfig {
764 reason: String,
765 #[snafu(implicit)]
766 location: Location,
767 },
768
769 #[cfg(feature = "mysql_kvbackend")]
770 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
771 MySqlExecution {
772 sql: String,
773 #[snafu(source)]
774 error: sqlx::Error,
775 #[snafu(implicit)]
776 location: Location,
777 },
778
779 #[cfg(feature = "mysql_kvbackend")]
780 #[snafu(display("Failed to create connection pool for MySql"))]
781 CreateMySqlPool {
782 #[snafu(source)]
783 error: sqlx::Error,
784 #[snafu(implicit)]
785 location: Location,
786 },
787
788 #[cfg(feature = "mysql_kvbackend")]
789 #[snafu(display("Failed to {} MySql transaction", operation))]
790 MySqlTransaction {
791 #[snafu(source)]
792 error: sqlx::Error,
793 #[snafu(implicit)]
794 location: Location,
795 operation: String,
796 },
797
798 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
799 #[snafu(display("Rds transaction retry failed"))]
800 RdsTransactionRetryFailed {
801 #[snafu(implicit)]
802 location: Location,
803 },
804
805 #[snafu(display(
806 "Datanode table info not found, table id: {}, datanode id: {}",
807 table_id,
808 datanode_id
809 ))]
810 DatanodeTableInfoNotFound {
811 datanode_id: DatanodeId,
812 table_id: TableId,
813 #[snafu(implicit)]
814 location: Location,
815 },
816
817 #[snafu(display("Invalid topic name prefix: {}", prefix))]
818 InvalidTopicNamePrefix {
819 prefix: String,
820 #[snafu(implicit)]
821 location: Location,
822 },
823
824 #[snafu(display("Failed to parse wal options: {}", wal_options))]
825 ParseWalOptions {
826 wal_options: String,
827 #[snafu(implicit)]
828 location: Location,
829 #[snafu(source)]
830 error: serde_json::Error,
831 },
832
833 #[snafu(display("No leader found for table_id: {}", table_id))]
834 NoLeader {
835 table_id: TableId,
836 #[snafu(implicit)]
837 location: Location,
838 },
839
840 #[snafu(display(
841 "Procedure poison key already exists with a different value, key: {}, value: {}",
842 key,
843 value
844 ))]
845 ProcedurePoisonConflict {
846 key: String,
847 value: String,
848 #[snafu(implicit)]
849 location: Location,
850 },
851
852 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
853 PutPoison {
854 #[snafu(implicit)]
855 location: Location,
856 #[snafu(source)]
857 source: common_procedure::error::Error,
858 },
859
860 #[snafu(display("Failed to parse timezone"))]
861 InvalidTimeZone {
862 #[snafu(implicit)]
863 location: Location,
864 #[snafu(source)]
865 error: common_time::error::Error,
866 },
867 #[snafu(display("Invalid file path: {}", file_path))]
868 InvalidFilePath {
869 #[snafu(implicit)]
870 location: Location,
871 file_path: String,
872 },
873
874 #[snafu(display("Failed to serialize flexbuffers"))]
875 SerializeFlexbuffers {
876 #[snafu(implicit)]
877 location: Location,
878 #[snafu(source)]
879 error: flexbuffers::SerializationError,
880 },
881
882 #[snafu(display("Failed to deserialize flexbuffers"))]
883 DeserializeFlexbuffers {
884 #[snafu(implicit)]
885 location: Location,
886 #[snafu(source)]
887 error: flexbuffers::DeserializationError,
888 },
889
890 #[snafu(display("Failed to read flexbuffers"))]
891 ReadFlexbuffers {
892 #[snafu(implicit)]
893 location: Location,
894 #[snafu(source)]
895 error: flexbuffers::ReaderError,
896 },
897
898 #[snafu(display("Invalid file name: {}", reason))]
899 InvalidFileName {
900 #[snafu(implicit)]
901 location: Location,
902 reason: String,
903 },
904
905 #[snafu(display("Invalid file extension: {}", reason))]
906 InvalidFileExtension {
907 #[snafu(implicit)]
908 location: Location,
909 reason: String,
910 },
911
912 #[snafu(display("Failed to write object, file path: {}", file_path))]
913 WriteObject {
914 #[snafu(implicit)]
915 location: Location,
916 file_path: String,
917 #[snafu(source)]
918 error: object_store::Error,
919 },
920
921 #[snafu(display("Failed to read object, file path: {}", file_path))]
922 ReadObject {
923 #[snafu(implicit)]
924 location: Location,
925 file_path: String,
926 #[snafu(source)]
927 error: object_store::Error,
928 },
929
930 #[snafu(display("Missing column ids"))]
931 MissingColumnIds {
932 #[snafu(implicit)]
933 location: Location,
934 },
935
936 #[snafu(display(
937 "Missing column in column metadata: {}, table: {}, table_id: {}",
938 column_name,
939 table_name,
940 table_id,
941 ))]
942 MissingColumnInColumnMetadata {
943 column_name: String,
944 #[snafu(implicit)]
945 location: Location,
946 table_name: String,
947 table_id: TableId,
948 },
949
950 #[snafu(display(
951 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
952 column_name,
953 column_id,
954 table_name,
955 table_id,
956 ))]
957 MismatchColumnId {
958 column_name: String,
959 column_id: u32,
960 #[snafu(implicit)]
961 location: Location,
962 table_name: String,
963 table_id: TableId,
964 },
965
966 #[snafu(display("Failed to convert column def, column: {}", column))]
967 ConvertColumnDef {
968 column: String,
969 #[snafu(implicit)]
970 location: Location,
971 source: api::error::Error,
972 },
973
974 #[snafu(display("Failed to convert time ranges"))]
975 ConvertTimeRanges {
976 #[snafu(implicit)]
977 location: Location,
978 source: api::error::Error,
979 },
980
981 #[snafu(display(
982 "Column metadata inconsistencies found in table: {}, table_id: {}",
983 table_name,
984 table_id
985 ))]
986 ColumnMetadataConflicts {
987 table_name: String,
988 table_id: TableId,
989 },
990}
991
992pub type Result<T> = std::result::Result<T, Error>;
993
994impl ErrorExt for Error {
995 fn status_code(&self) -> StatusCode {
996 use Error::*;
997 match self {
998 IllegalServerState { .. }
999 | EtcdTxnOpResponse { .. }
1000 | EtcdFailed { .. }
1001 | EtcdTxnFailed { .. }
1002 | ConnectEtcd { .. }
1003 | MoveValues { .. }
1004 | GetCache { .. }
1005 | SerializeToJson { .. }
1006 | DeserializeFromJson { .. } => StatusCode::Internal,
1007
1008 NoLeader { .. } => StatusCode::TableUnavailable,
1009 ValueNotExist { .. }
1010 | ProcedurePoisonConflict { .. }
1011 | ProcedureStateReceiverNotFound { .. }
1012 | MissingColumnIds { .. }
1013 | MissingColumnInColumnMetadata { .. }
1014 | MismatchColumnId { .. }
1015 | ColumnMetadataConflicts { .. } => StatusCode::Unexpected,
1016
1017 Unsupported { .. } => StatusCode::Unsupported,
1018 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1019
1020 SerdeJson { .. }
1021 | ParseOption { .. }
1022 | RouteInfoCorrupted { .. }
1023 | InvalidProtoMsg { .. }
1024 | InvalidMetadata { .. }
1025 | Unexpected { .. }
1026 | TableInfoNotFound { .. }
1027 | NextSequence { .. }
1028 | UnexpectedSequenceValue { .. }
1029 | InvalidHeartbeatResponse { .. }
1030 | EncodeJson { .. }
1031 | DecodeJson { .. }
1032 | PayloadNotExist { .. }
1033 | ConvertRawKey { .. }
1034 | DecodeProto { .. }
1035 | BuildTableMeta { .. }
1036 | TableRouteNotFound { .. }
1037 | ConvertRawTableInfo { .. }
1038 | RegionOperatingRace { .. }
1039 | EncodeWalOptions { .. }
1040 | BuildKafkaClient { .. }
1041 | BuildKafkaCtrlClient { .. }
1042 | KafkaPartitionClient { .. }
1043 | ResolveKafkaEndpoint { .. }
1044 | ProduceRecord { .. }
1045 | CreateKafkaWalTopic { .. }
1046 | EmptyTopicPool { .. }
1047 | UnexpectedLogicalRouteTable { .. }
1048 | ProcedureOutput { .. }
1049 | FromUtf8 { .. }
1050 | MetadataCorruption { .. }
1051 | ParseWalOptions { .. }
1052 | KafkaGetOffset { .. }
1053 | ReadFlexbuffers { .. }
1054 | SerializeFlexbuffers { .. }
1055 | DeserializeFlexbuffers { .. }
1056 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1057
1058 SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1059
1060 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1061
1062 ProcedureNotFound { .. }
1063 | InvalidViewInfo { .. }
1064 | PrimaryKeyNotFound { .. }
1065 | EmptyKey { .. }
1066 | AlterLogicalTablesInvalidArguments { .. }
1067 | CreateLogicalTablesInvalidArguments { .. }
1068 | MismatchPrefix { .. }
1069 | TlsConfig { .. }
1070 | InvalidSetDatabaseOption { .. }
1071 | InvalidUnsetDatabaseOption { .. }
1072 | InvalidTopicNamePrefix { .. }
1073 | InvalidTimeZone { .. }
1074 | InvalidFileExtension { .. }
1075 | InvalidFileName { .. }
1076 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1077 InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
1078
1079 FlowNotFound { .. } => StatusCode::FlowNotFound,
1080 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1081 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1082
1083 ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
1084 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1085
1086 SubmitProcedure { source, .. }
1087 | QueryProcedure { source, .. }
1088 | WaitProcedure { source, .. }
1089 | StartProcedureManager { source, .. }
1090 | StopProcedureManager { source, .. } => source.status_code(),
1091 RegisterProcedureLoader { source, .. } => source.status_code(),
1092 External { source, .. } => source.status_code(),
1093 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1094 OperateDatanode { source, .. } => source.status_code(),
1095 Table { source, .. } => source.status_code(),
1096 RetryLater { source, .. } => source.status_code(),
1097 AbortProcedure { source, .. } => source.status_code(),
1098 ConvertAlterTableRequest { source, .. } => source.status_code(),
1099 PutPoison { source, .. } => source.status_code(),
1100 ConvertColumnDef { source, .. } => source.status_code(),
1101 ProcedureStateReceiver { source, .. } => source.status_code(),
1102
1103 ParseProcedureId { .. }
1104 | InvalidNumTopics { .. }
1105 | SchemaNotFound { .. }
1106 | CatalogNotFound { .. }
1107 | InvalidNodeInfoKey { .. }
1108 | InvalidStatKey { .. }
1109 | ParseNum { .. }
1110 | InvalidRole { .. }
1111 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1112
1113 #[cfg(feature = "pg_kvbackend")]
1114 PostgresExecution { .. }
1115 | CreatePostgresPool { .. }
1116 | GetPostgresConnection { .. }
1117 | PostgresTransaction { .. }
1118 | PostgresTlsConfig { .. }
1119 | LoadTlsCertificate { .. }
1120 | InvalidTlsConfig { .. } => StatusCode::Internal,
1121 #[cfg(feature = "mysql_kvbackend")]
1122 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1123 StatusCode::Internal
1124 }
1125 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1126 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1127 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1128 }
1129 }
1130
1131 fn as_any(&self) -> &dyn std::any::Any {
1132 self
1133 }
1134}
1135
1136impl Error {
1137 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1138 pub fn is_serialization_error(&self) -> bool {
1140 match self {
1141 #[cfg(feature = "pg_kvbackend")]
1142 Error::PostgresTransaction { error, .. } => {
1143 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1144 }
1145 #[cfg(feature = "pg_kvbackend")]
1146 Error::PostgresExecution { error, .. } => {
1147 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1148 }
1149 #[cfg(feature = "mysql_kvbackend")]
1150 Error::MySqlExecution {
1151 error: sqlx::Error::Database(database_error),
1152 ..
1153 } => {
1154 matches!(
1155 database_error.message(),
1156 "Deadlock found when trying to get lock; try restarting transaction"
1157 | "can't serialize access for this transaction"
1158 )
1159 }
1160 _ => false,
1161 }
1162 }
1163
1164 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1166 Error::RetryLater {
1167 source: BoxedError::new(err),
1168 clean_poisons: false,
1169 }
1170 }
1171
1172 pub fn is_retry_later(&self) -> bool {
1174 matches!(self, Error::RetryLater { .. })
1175 }
1176
1177 pub fn need_clean_poisons(&self) -> bool {
1179 matches!(
1180 self,
1181 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1182 ) || matches!(
1183 self,
1184 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1185 )
1186 }
1187
1188 pub fn is_exceeded_size_limit(&self) -> bool {
1190 match self {
1191 Error::EtcdFailed {
1192 error: etcd_client::Error::GRpcStatus(status),
1193 ..
1194 } => status.code() == tonic::Code::OutOfRange,
1195 Error::ResponseExceededSizeLimit { .. } => true,
1196 _ => false,
1197 }
1198 }
1199}