1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::peer::Peer;
29use crate::DatanodeId;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to submit procedure"))]
108 SubmitProcedure {
109 #[snafu(implicit)]
110 location: Location,
111 source: common_procedure::Error,
112 },
113
114 #[snafu(display("Failed to query procedure"))]
115 QueryProcedure {
116 #[snafu(implicit)]
117 location: Location,
118 source: common_procedure::Error,
119 },
120
121 #[snafu(display("Procedure not found: {pid}"))]
122 ProcedureNotFound {
123 #[snafu(implicit)]
124 location: Location,
125 pid: String,
126 },
127
128 #[snafu(display("Failed to parse procedure id: {key}"))]
129 ParseProcedureId {
130 #[snafu(implicit)]
131 location: Location,
132 key: String,
133 #[snafu(source)]
134 error: common_procedure::ParseIdError,
135 },
136
137 #[snafu(display("Unsupported operation {}", operation))]
138 Unsupported {
139 operation: String,
140 #[snafu(implicit)]
141 location: Location,
142 },
143
144 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145 ProcedureStateReceiver {
146 procedure_id: ProcedureId,
147 #[snafu(implicit)]
148 location: Location,
149 source: common_procedure::Error,
150 },
151
152 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153 ProcedureStateReceiverNotFound {
154 procedure_id: ProcedureId,
155 #[snafu(implicit)]
156 location: Location,
157 },
158
159 #[snafu(display("Failed to wait procedure done"))]
160 WaitProcedure {
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Failed to start procedure manager"))]
167 StartProcedureManager {
168 #[snafu(implicit)]
169 location: Location,
170 source: common_procedure::Error,
171 },
172
173 #[snafu(display("Failed to stop procedure manager"))]
174 StopProcedureManager {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display(
181 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182 ))]
183 ProcedureOutput {
184 procedure_id: String,
185 err_msg: String,
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191 ConvertRawTableInfo {
192 #[snafu(implicit)]
193 location: Location,
194 source: datatypes::Error,
195 },
196
197 #[snafu(display("Primary key '{key}' not found when creating region request"))]
198 PrimaryKeyNotFound {
199 key: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Failed to build table meta for table: {}", table_name))]
205 BuildTableMeta {
206 table_name: String,
207 #[snafu(source)]
208 error: table::metadata::TableMetaBuilderError,
209 #[snafu(implicit)]
210 location: Location,
211 },
212
213 #[snafu(display("Table occurs error"))]
214 Table {
215 #[snafu(implicit)]
216 location: Location,
217 source: table::error::Error,
218 },
219
220 #[snafu(display("Failed to find table route for table id {}", table_id))]
221 TableRouteNotFound {
222 table_id: TableId,
223 #[snafu(implicit)]
224 location: Location,
225 },
226
227 #[snafu(display("Failed to decode protobuf"))]
228 DecodeProto {
229 #[snafu(implicit)]
230 location: Location,
231 #[snafu(source)]
232 error: prost::DecodeError,
233 },
234
235 #[snafu(display("Failed to encode object into json"))]
236 EncodeJson {
237 #[snafu(implicit)]
238 location: Location,
239 #[snafu(source)]
240 error: JsonError,
241 },
242
243 #[snafu(display("Failed to decode object from json"))]
244 DecodeJson {
245 #[snafu(implicit)]
246 location: Location,
247 #[snafu(source)]
248 error: JsonError,
249 },
250
251 #[snafu(display("Failed to serialize to json: {}", input))]
252 SerializeToJson {
253 input: String,
254 #[snafu(source)]
255 error: serde_json::error::Error,
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to deserialize from json: {}", input))]
261 DeserializeFromJson {
262 input: String,
263 #[snafu(source)]
264 error: serde_json::error::Error,
265 #[snafu(implicit)]
266 location: Location,
267 },
268
269 #[snafu(display("Payload not exist"))]
270 PayloadNotExist {
271 #[snafu(implicit)]
272 location: Location,
273 },
274
275 #[snafu(display("Failed to send message: {err_msg}"))]
276 SendMessage {
277 err_msg: String,
278 #[snafu(implicit)]
279 location: Location,
280 },
281
282 #[snafu(display("Failed to serde json"))]
283 SerdeJson {
284 #[snafu(source)]
285 error: serde_json::error::Error,
286 #[snafu(implicit)]
287 location: Location,
288 },
289
290 #[snafu(display("Failed to parse value {} into key {}", value, key))]
291 ParseOption {
292 key: String,
293 value: String,
294 #[snafu(implicit)]
295 location: Location,
296 },
297
298 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
299 RouteInfoCorrupted {
300 err_msg: String,
301 #[snafu(implicit)]
302 location: Location,
303 },
304
305 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
306 IllegalServerState {
307 code: i32,
308 err_msg: String,
309 #[snafu(implicit)]
310 location: Location,
311 },
312
313 #[snafu(display("Failed to convert alter table request"))]
314 ConvertAlterTableRequest {
315 source: common_grpc_expr::error::Error,
316 #[snafu(implicit)]
317 location: Location,
318 },
319
320 #[snafu(display("Invalid protobuf message: {err_msg}"))]
321 InvalidProtoMsg {
322 err_msg: String,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Unexpected: {err_msg}"))]
328 Unexpected {
329 err_msg: String,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("Table already exists, table: {}", table_name))]
335 TableAlreadyExists {
336 table_name: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("View already exists, view: {}", view_name))]
342 ViewAlreadyExists {
343 view_name: String,
344 #[snafu(implicit)]
345 location: Location,
346 },
347
348 #[snafu(display("Flow already exists: {}", flow_name))]
349 FlowAlreadyExists {
350 flow_name: String,
351 #[snafu(implicit)]
352 location: Location,
353 },
354
355 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
356 SchemaAlreadyExists {
357 catalog: String,
358 schema: String,
359 #[snafu(implicit)]
360 location: Location,
361 },
362
363 #[snafu(display("Failed to convert raw key to str"))]
364 ConvertRawKey {
365 #[snafu(implicit)]
366 location: Location,
367 #[snafu(source)]
368 error: Utf8Error,
369 },
370
371 #[snafu(display("Table not found: '{}'", table_name))]
372 TableNotFound {
373 table_name: String,
374 #[snafu(implicit)]
375 location: Location,
376 },
377
378 #[snafu(display("Region not found: {}", region_id))]
379 RegionNotFound {
380 region_id: RegionId,
381 #[snafu(implicit)]
382 location: Location,
383 },
384
385 #[snafu(display("View not found: '{}'", view_name))]
386 ViewNotFound {
387 view_name: String,
388 #[snafu(implicit)]
389 location: Location,
390 },
391
392 #[snafu(display("Flow not found: '{}'", flow_name))]
393 FlowNotFound {
394 flow_name: String,
395 #[snafu(implicit)]
396 location: Location,
397 },
398
399 #[snafu(display("Flow route not found: '{}'", flow_name))]
400 FlowRouteNotFound {
401 flow_name: String,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Schema nod found, schema: {}", table_schema))]
407 SchemaNotFound {
408 table_schema: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Catalog not found, catalog: {}", catalog))]
414 CatalogNotFound {
415 catalog: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[snafu(display("Invalid metadata, err: {}", err_msg))]
421 InvalidMetadata {
422 err_msg: String,
423 #[snafu(implicit)]
424 location: Location,
425 },
426
427 #[snafu(display("Invalid view info, err: {}", err_msg))]
428 InvalidViewInfo {
429 err_msg: String,
430 #[snafu(implicit)]
431 location: Location,
432 },
433
434 #[snafu(display("Invalid flow request body: {:?}", body))]
435 InvalidFlowRequestBody {
436 body: Box<Option<api::v1::flow::flow_request::Body>>,
437 #[snafu(implicit)]
438 location: Location,
439 },
440
441 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
442 GetKvCache { err_msg: String },
443
444 #[snafu(display("Get null from cache, key: {}", key))]
445 CacheNotGet {
446 key: String,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("Etcd txn error: {err_msg}"))]
452 EtcdTxnOpResponse {
453 err_msg: String,
454 #[snafu(implicit)]
455 location: Location,
456 },
457
458 #[snafu(display("External error"))]
459 External {
460 #[snafu(implicit)]
461 location: Location,
462 source: BoxedError,
463 },
464
465 #[snafu(display("The response exceeded size limit"))]
466 ResponseExceededSizeLimit {
467 #[snafu(implicit)]
468 location: Location,
469 source: BoxedError,
470 },
471
472 #[snafu(display("Invalid heartbeat response"))]
473 InvalidHeartbeatResponse {
474 #[snafu(implicit)]
475 location: Location,
476 },
477
478 #[snafu(display("Failed to operate on datanode: {}", peer))]
479 OperateDatanode {
480 #[snafu(implicit)]
481 location: Location,
482 peer: Peer,
483 source: BoxedError,
484 },
485
486 #[snafu(display("Retry later"))]
487 RetryLater {
488 source: BoxedError,
489 clean_poisons: bool,
490 },
491
492 #[snafu(display("Abort procedure"))]
493 AbortProcedure {
494 #[snafu(implicit)]
495 location: Location,
496 source: BoxedError,
497 clean_poisons: bool,
498 },
499
500 #[snafu(display(
501 "Failed to encode a wal options to json string, wal_options: {:?}",
502 wal_options
503 ))]
504 EncodeWalOptions {
505 wal_options: WalOptions,
506 #[snafu(source)]
507 error: serde_json::Error,
508 #[snafu(implicit)]
509 location: Location,
510 },
511
512 #[snafu(display("Invalid number of topics {}", num_topics))]
513 InvalidNumTopics {
514 num_topics: usize,
515 #[snafu(implicit)]
516 location: Location,
517 },
518
519 #[snafu(display(
520 "Failed to build a Kafka client, broker endpoints: {:?}",
521 broker_endpoints
522 ))]
523 BuildKafkaClient {
524 broker_endpoints: Vec<String>,
525 #[snafu(implicit)]
526 location: Location,
527 #[snafu(source)]
528 error: rskafka::client::error::Error,
529 },
530
531 #[snafu(display("Failed to create TLS Config"))]
532 TlsConfig {
533 #[snafu(implicit)]
534 location: Location,
535 source: common_wal::error::Error,
536 },
537
538 #[snafu(display("Failed to build a Kafka controller client"))]
539 BuildKafkaCtrlClient {
540 #[snafu(implicit)]
541 location: Location,
542 #[snafu(source)]
543 error: rskafka::client::error::Error,
544 },
545
546 #[snafu(display(
547 "Failed to get a Kafka partition client, topic: {}, partition: {}",
548 topic,
549 partition
550 ))]
551 KafkaPartitionClient {
552 topic: String,
553 partition: i32,
554 #[snafu(implicit)]
555 location: Location,
556 #[snafu(source)]
557 error: rskafka::client::error::Error,
558 },
559
560 #[snafu(display(
561 "Failed to get offset from Kafka, topic: {}, partition: {}",
562 topic,
563 partition
564 ))]
565 KafkaGetOffset {
566 topic: String,
567 partition: i32,
568 #[snafu(implicit)]
569 location: Location,
570 #[snafu(source)]
571 error: rskafka::client::error::Error,
572 },
573
574 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
575 ProduceRecord {
576 topic: String,
577 #[snafu(implicit)]
578 location: Location,
579 #[snafu(source)]
580 error: rskafka::client::error::Error,
581 },
582
583 #[snafu(display("Failed to create a Kafka wal topic"))]
584 CreateKafkaWalTopic {
585 #[snafu(implicit)]
586 location: Location,
587 #[snafu(source)]
588 error: rskafka::client::error::Error,
589 },
590
591 #[snafu(display("The topic pool is empty"))]
592 EmptyTopicPool {
593 #[snafu(implicit)]
594 location: Location,
595 },
596
597 #[snafu(display("Unexpected table route type: {}", err_msg))]
598 UnexpectedLogicalRouteTable {
599 #[snafu(implicit)]
600 location: Location,
601 err_msg: String,
602 },
603
604 #[snafu(display("The tasks of {} cannot be empty", name))]
605 EmptyDdlTasks {
606 name: String,
607 #[snafu(implicit)]
608 location: Location,
609 },
610
611 #[snafu(display("Metadata corruption: {}", err_msg))]
612 MetadataCorruption {
613 err_msg: String,
614 #[snafu(implicit)]
615 location: Location,
616 },
617
618 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
619 AlterLogicalTablesInvalidArguments {
620 err_msg: String,
621 #[snafu(implicit)]
622 location: Location,
623 },
624
625 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
626 CreateLogicalTablesInvalidArguments {
627 err_msg: String,
628 #[snafu(implicit)]
629 location: Location,
630 },
631
632 #[snafu(display("Invalid node info key: {}", key))]
633 InvalidNodeInfoKey {
634 key: String,
635 #[snafu(implicit)]
636 location: Location,
637 },
638
639 #[snafu(display("Invalid node stat key: {}", key))]
640 InvalidStatKey {
641 key: String,
642 #[snafu(implicit)]
643 location: Location,
644 },
645
646 #[snafu(display("Failed to parse number: {}", err_msg))]
647 ParseNum {
648 err_msg: String,
649 #[snafu(source)]
650 error: std::num::ParseIntError,
651 #[snafu(implicit)]
652 location: Location,
653 },
654
655 #[snafu(display("Invalid role: {}", role))]
656 InvalidRole {
657 role: i32,
658 #[snafu(implicit)]
659 location: Location,
660 },
661
662 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
663 InvalidSetDatabaseOption {
664 key: String,
665 value: String,
666 #[snafu(implicit)]
667 location: Location,
668 },
669
670 #[snafu(display("Invalid unset database option, key: {}", key))]
671 InvalidUnsetDatabaseOption {
672 key: String,
673 #[snafu(implicit)]
674 location: Location,
675 },
676
677 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
678 MismatchPrefix {
679 prefix: String,
680 key: String,
681 #[snafu(implicit)]
682 location: Location,
683 },
684
685 #[snafu(display("Failed to move values: {err_msg}"))]
686 MoveValues {
687 err_msg: String,
688 #[snafu(implicit)]
689 location: Location,
690 },
691
692 #[snafu(display("Failed to parse {} from utf8", name))]
693 FromUtf8 {
694 name: String,
695 #[snafu(source)]
696 error: std::string::FromUtf8Error,
697 #[snafu(implicit)]
698 location: Location,
699 },
700
701 #[snafu(display("Value not exists"))]
702 ValueNotExist {
703 #[snafu(implicit)]
704 location: Location,
705 },
706
707 #[snafu(display("Failed to get cache"))]
708 GetCache { source: Arc<Error> },
709
710 #[cfg(feature = "pg_kvbackend")]
711 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
712 PostgresExecution {
713 sql: String,
714 #[snafu(source)]
715 error: tokio_postgres::Error,
716 #[snafu(implicit)]
717 location: Location,
718 },
719
720 #[cfg(feature = "pg_kvbackend")]
721 #[snafu(display("Failed to create connection pool for Postgres"))]
722 CreatePostgresPool {
723 #[snafu(source)]
724 error: deadpool_postgres::CreatePoolError,
725 #[snafu(implicit)]
726 location: Location,
727 },
728
729 #[cfg(feature = "pg_kvbackend")]
730 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
731 GetPostgresConnection {
732 reason: String,
733 #[snafu(implicit)]
734 location: Location,
735 },
736
737 #[cfg(feature = "pg_kvbackend")]
738 #[snafu(display("Failed to {} Postgres transaction", operation))]
739 PostgresTransaction {
740 #[snafu(source)]
741 error: tokio_postgres::Error,
742 #[snafu(implicit)]
743 location: Location,
744 operation: String,
745 },
746
747 #[cfg(feature = "pg_kvbackend")]
748 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
749 PostgresTlsConfig {
750 reason: String,
751 #[snafu(implicit)]
752 location: Location,
753 },
754
755 #[cfg(feature = "pg_kvbackend")]
756 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
757 LoadTlsCertificate {
758 path: String,
759 #[snafu(source)]
760 error: std::io::Error,
761 #[snafu(implicit)]
762 location: Location,
763 },
764
765 #[cfg(feature = "pg_kvbackend")]
766 #[snafu(display("Invalid TLS configuration: {}", reason))]
767 InvalidTlsConfig {
768 reason: String,
769 #[snafu(implicit)]
770 location: Location,
771 },
772
773 #[cfg(feature = "mysql_kvbackend")]
774 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
775 MySqlExecution {
776 sql: String,
777 #[snafu(source)]
778 error: sqlx::Error,
779 #[snafu(implicit)]
780 location: Location,
781 },
782
783 #[cfg(feature = "mysql_kvbackend")]
784 #[snafu(display("Failed to create connection pool for MySql"))]
785 CreateMySqlPool {
786 #[snafu(source)]
787 error: sqlx::Error,
788 #[snafu(implicit)]
789 location: Location,
790 },
791
792 #[cfg(feature = "mysql_kvbackend")]
793 #[snafu(display("Failed to {} MySql transaction", operation))]
794 MySqlTransaction {
795 #[snafu(source)]
796 error: sqlx::Error,
797 #[snafu(implicit)]
798 location: Location,
799 operation: String,
800 },
801
802 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
803 #[snafu(display("Rds transaction retry failed"))]
804 RdsTransactionRetryFailed {
805 #[snafu(implicit)]
806 location: Location,
807 },
808
809 #[snafu(display(
810 "Datanode table info not found, table id: {}, datanode id: {}",
811 table_id,
812 datanode_id
813 ))]
814 DatanodeTableInfoNotFound {
815 datanode_id: DatanodeId,
816 table_id: TableId,
817 #[snafu(implicit)]
818 location: Location,
819 },
820
821 #[snafu(display("Invalid topic name prefix: {}", prefix))]
822 InvalidTopicNamePrefix {
823 prefix: String,
824 #[snafu(implicit)]
825 location: Location,
826 },
827
828 #[snafu(display("Failed to parse wal options: {}", wal_options))]
829 ParseWalOptions {
830 wal_options: String,
831 #[snafu(implicit)]
832 location: Location,
833 #[snafu(source)]
834 error: serde_json::Error,
835 },
836
837 #[snafu(display("No leader found for table_id: {}", table_id))]
838 NoLeader {
839 table_id: TableId,
840 #[snafu(implicit)]
841 location: Location,
842 },
843
844 #[snafu(display(
845 "Procedure poison key already exists with a different value, key: {}, value: {}",
846 key,
847 value
848 ))]
849 ProcedurePoisonConflict {
850 key: String,
851 value: String,
852 #[snafu(implicit)]
853 location: Location,
854 },
855
856 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
857 PutPoison {
858 #[snafu(implicit)]
859 location: Location,
860 #[snafu(source)]
861 source: common_procedure::error::Error,
862 },
863
864 #[snafu(display("Failed to parse timezone"))]
865 InvalidTimeZone {
866 #[snafu(implicit)]
867 location: Location,
868 #[snafu(source)]
869 error: common_time::error::Error,
870 },
871 #[snafu(display("Invalid file path: {}", file_path))]
872 InvalidFilePath {
873 #[snafu(implicit)]
874 location: Location,
875 file_path: String,
876 },
877
878 #[snafu(display("Failed to serialize flexbuffers"))]
879 SerializeFlexbuffers {
880 #[snafu(implicit)]
881 location: Location,
882 #[snafu(source)]
883 error: flexbuffers::SerializationError,
884 },
885
886 #[snafu(display("Failed to deserialize flexbuffers"))]
887 DeserializeFlexbuffers {
888 #[snafu(implicit)]
889 location: Location,
890 #[snafu(source)]
891 error: flexbuffers::DeserializationError,
892 },
893
894 #[snafu(display("Failed to read flexbuffers"))]
895 ReadFlexbuffers {
896 #[snafu(implicit)]
897 location: Location,
898 #[snafu(source)]
899 error: flexbuffers::ReaderError,
900 },
901
902 #[snafu(display("Invalid file name: {}", reason))]
903 InvalidFileName {
904 #[snafu(implicit)]
905 location: Location,
906 reason: String,
907 },
908
909 #[snafu(display("Invalid file extension: {}", reason))]
910 InvalidFileExtension {
911 #[snafu(implicit)]
912 location: Location,
913 reason: String,
914 },
915
916 #[snafu(display("Failed to write object, file path: {}", file_path))]
917 WriteObject {
918 #[snafu(implicit)]
919 location: Location,
920 file_path: String,
921 #[snafu(source)]
922 error: object_store::Error,
923 },
924
925 #[snafu(display("Failed to read object, file path: {}", file_path))]
926 ReadObject {
927 #[snafu(implicit)]
928 location: Location,
929 file_path: String,
930 #[snafu(source)]
931 error: object_store::Error,
932 },
933
934 #[snafu(display("Missing column ids"))]
935 MissingColumnIds {
936 #[snafu(implicit)]
937 location: Location,
938 },
939
940 #[snafu(display(
941 "Missing column in column metadata: {}, table: {}, table_id: {}",
942 column_name,
943 table_name,
944 table_id,
945 ))]
946 MissingColumnInColumnMetadata {
947 column_name: String,
948 #[snafu(implicit)]
949 location: Location,
950 table_name: String,
951 table_id: TableId,
952 },
953
954 #[snafu(display(
955 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
956 column_name,
957 column_id,
958 table_name,
959 table_id,
960 ))]
961 MismatchColumnId {
962 column_name: String,
963 column_id: u32,
964 #[snafu(implicit)]
965 location: Location,
966 table_name: String,
967 table_id: TableId,
968 },
969
970 #[snafu(display("Failed to convert column def, column: {}", column))]
971 ConvertColumnDef {
972 column: String,
973 #[snafu(implicit)]
974 location: Location,
975 source: api::error::Error,
976 },
977
978 #[snafu(display("Failed to convert time ranges"))]
979 ConvertTimeRanges {
980 #[snafu(implicit)]
981 location: Location,
982 source: api::error::Error,
983 },
984
985 #[snafu(display(
986 "Column metadata inconsistencies found in table: {}, table_id: {}",
987 table_name,
988 table_id
989 ))]
990 ColumnMetadataConflicts {
991 table_name: String,
992 table_id: TableId,
993 },
994
995 #[snafu(display(
996 "Column not found in column metadata, column_name: {}, column_id: {}",
997 column_name,
998 column_id
999 ))]
1000 ColumnNotFound { column_name: String, column_id: u32 },
1001
1002 #[snafu(display(
1003 "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1004 column_name,
1005 expected_column_id,
1006 actual_column_id
1007 ))]
1008 ColumnIdMismatch {
1009 column_name: String,
1010 expected_column_id: u32,
1011 actual_column_id: u32,
1012 },
1013
1014 #[snafu(display(
1015 "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1016 expected_column_name,
1017 expected_column_id,
1018 actual_column_name,
1019 actual_column_id,
1020 ))]
1021 TimestampMismatch {
1022 expected_column_name: String,
1023 expected_column_id: u32,
1024 actual_column_name: String,
1025 actual_column_id: u32,
1026 },
1027
1028 #[cfg(feature = "enterprise")]
1029 #[snafu(display("Too large duration"))]
1030 TooLargeDuration {
1031 #[snafu(source)]
1032 error: prost_types::DurationError,
1033 #[snafu(implicit)]
1034 location: Location,
1035 },
1036
1037 #[cfg(feature = "enterprise")]
1038 #[snafu(display("Negative duration"))]
1039 NegativeDuration {
1040 #[snafu(source)]
1041 error: prost_types::DurationError,
1042 #[snafu(implicit)]
1043 location: Location,
1044 },
1045
1046 #[cfg(feature = "enterprise")]
1047 #[snafu(display("Missing interval field"))]
1048 MissingInterval {
1049 #[snafu(implicit)]
1050 location: Location,
1051 },
1052}
1053
1054pub type Result<T> = std::result::Result<T, Error>;
1055
1056impl ErrorExt for Error {
1057 fn status_code(&self) -> StatusCode {
1058 use Error::*;
1059 match self {
1060 IllegalServerState { .. }
1061 | EtcdTxnOpResponse { .. }
1062 | EtcdFailed { .. }
1063 | EtcdTxnFailed { .. }
1064 | ConnectEtcd { .. }
1065 | MoveValues { .. }
1066 | GetCache { .. }
1067 | SerializeToJson { .. }
1068 | DeserializeFromJson { .. } => StatusCode::Internal,
1069
1070 NoLeader { .. } => StatusCode::TableUnavailable,
1071 ValueNotExist { .. }
1072 | ProcedurePoisonConflict { .. }
1073 | ProcedureStateReceiverNotFound { .. }
1074 | MissingColumnIds { .. }
1075 | MissingColumnInColumnMetadata { .. }
1076 | MismatchColumnId { .. }
1077 | ColumnMetadataConflicts { .. }
1078 | ColumnNotFound { .. }
1079 | ColumnIdMismatch { .. }
1080 | TimestampMismatch { .. } => StatusCode::Unexpected,
1081
1082 Unsupported { .. } => StatusCode::Unsupported,
1083 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1084
1085 SerdeJson { .. }
1086 | ParseOption { .. }
1087 | RouteInfoCorrupted { .. }
1088 | InvalidProtoMsg { .. }
1089 | InvalidMetadata { .. }
1090 | Unexpected { .. }
1091 | TableInfoNotFound { .. }
1092 | NextSequence { .. }
1093 | UnexpectedSequenceValue { .. }
1094 | InvalidHeartbeatResponse { .. }
1095 | EncodeJson { .. }
1096 | DecodeJson { .. }
1097 | PayloadNotExist { .. }
1098 | ConvertRawKey { .. }
1099 | DecodeProto { .. }
1100 | BuildTableMeta { .. }
1101 | TableRouteNotFound { .. }
1102 | ConvertRawTableInfo { .. }
1103 | RegionOperatingRace { .. }
1104 | EncodeWalOptions { .. }
1105 | BuildKafkaClient { .. }
1106 | BuildKafkaCtrlClient { .. }
1107 | KafkaPartitionClient { .. }
1108 | ProduceRecord { .. }
1109 | CreateKafkaWalTopic { .. }
1110 | EmptyTopicPool { .. }
1111 | UnexpectedLogicalRouteTable { .. }
1112 | ProcedureOutput { .. }
1113 | FromUtf8 { .. }
1114 | MetadataCorruption { .. }
1115 | ParseWalOptions { .. }
1116 | KafkaGetOffset { .. }
1117 | ReadFlexbuffers { .. }
1118 | SerializeFlexbuffers { .. }
1119 | DeserializeFlexbuffers { .. }
1120 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1121
1122 SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1123
1124 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1125
1126 ProcedureNotFound { .. }
1127 | InvalidViewInfo { .. }
1128 | PrimaryKeyNotFound { .. }
1129 | EmptyKey { .. }
1130 | AlterLogicalTablesInvalidArguments { .. }
1131 | CreateLogicalTablesInvalidArguments { .. }
1132 | MismatchPrefix { .. }
1133 | TlsConfig { .. }
1134 | InvalidSetDatabaseOption { .. }
1135 | InvalidUnsetDatabaseOption { .. }
1136 | InvalidTopicNamePrefix { .. }
1137 | InvalidTimeZone { .. }
1138 | InvalidFileExtension { .. }
1139 | InvalidFileName { .. }
1140 | InvalidFlowRequestBody { .. }
1141 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1142
1143 #[cfg(feature = "enterprise")]
1144 MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1145 StatusCode::InvalidArguments
1146 }
1147
1148 FlowNotFound { .. } => StatusCode::FlowNotFound,
1149 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1150 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1151
1152 ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1153 StatusCode::TableNotFound
1154 }
1155 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1156
1157 SubmitProcedure { source, .. }
1158 | QueryProcedure { source, .. }
1159 | WaitProcedure { source, .. }
1160 | StartProcedureManager { source, .. }
1161 | StopProcedureManager { source, .. } => source.status_code(),
1162 RegisterProcedureLoader { source, .. } => source.status_code(),
1163 External { source, .. } => source.status_code(),
1164 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1165 OperateDatanode { source, .. } => source.status_code(),
1166 Table { source, .. } => source.status_code(),
1167 RetryLater { source, .. } => source.status_code(),
1168 AbortProcedure { source, .. } => source.status_code(),
1169 ConvertAlterTableRequest { source, .. } => source.status_code(),
1170 PutPoison { source, .. } => source.status_code(),
1171 ConvertColumnDef { source, .. } => source.status_code(),
1172 ProcedureStateReceiver { source, .. } => source.status_code(),
1173
1174 ParseProcedureId { .. }
1175 | InvalidNumTopics { .. }
1176 | SchemaNotFound { .. }
1177 | CatalogNotFound { .. }
1178 | InvalidNodeInfoKey { .. }
1179 | InvalidStatKey { .. }
1180 | ParseNum { .. }
1181 | InvalidRole { .. }
1182 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1183
1184 #[cfg(feature = "pg_kvbackend")]
1185 PostgresExecution { .. }
1186 | CreatePostgresPool { .. }
1187 | GetPostgresConnection { .. }
1188 | PostgresTransaction { .. }
1189 | PostgresTlsConfig { .. }
1190 | LoadTlsCertificate { .. }
1191 | InvalidTlsConfig { .. } => StatusCode::Internal,
1192 #[cfg(feature = "mysql_kvbackend")]
1193 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1194 StatusCode::Internal
1195 }
1196 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1197 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1198 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1199 }
1200 }
1201
1202 fn as_any(&self) -> &dyn std::any::Any {
1203 self
1204 }
1205}
1206
1207impl Error {
1208 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1209 pub fn is_serialization_error(&self) -> bool {
1211 match self {
1212 #[cfg(feature = "pg_kvbackend")]
1213 Error::PostgresTransaction { error, .. } => {
1214 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1215 }
1216 #[cfg(feature = "pg_kvbackend")]
1217 Error::PostgresExecution { error, .. } => {
1218 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1219 }
1220 #[cfg(feature = "mysql_kvbackend")]
1221 Error::MySqlExecution {
1222 error: sqlx::Error::Database(database_error),
1223 ..
1224 } => {
1225 matches!(
1226 database_error.message(),
1227 "Deadlock found when trying to get lock; try restarting transaction"
1228 | "can't serialize access for this transaction"
1229 )
1230 }
1231 _ => false,
1232 }
1233 }
1234
1235 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1237 Error::RetryLater {
1238 source: BoxedError::new(err),
1239 clean_poisons: false,
1240 }
1241 }
1242
1243 pub fn is_retry_later(&self) -> bool {
1245 matches!(self, Error::RetryLater { .. })
1246 }
1247
1248 pub fn need_clean_poisons(&self) -> bool {
1250 matches!(
1251 self,
1252 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1253 ) || matches!(
1254 self,
1255 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1256 )
1257 }
1258
1259 pub fn is_exceeded_size_limit(&self) -> bool {
1261 match self {
1262 Error::EtcdFailed {
1263 error: etcd_client::Error::GRpcStatus(status),
1264 ..
1265 } => status.code() == tonic::Code::OutOfRange,
1266 Error::ResponseExceededSizeLimit { .. } => true,
1267 _ => false,
1268 }
1269 }
1270}