1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to register repartition procedure loader"))]
108 RegisterRepartitionProcedureLoader {
109 #[snafu(implicit)]
110 location: Location,
111 source: BoxedError,
112 },
113
114 #[snafu(display("Failed to create repartition procedure"))]
115 CreateRepartitionProcedure {
116 source: BoxedError,
117 #[snafu(implicit)]
118 location: Location,
119 },
120
121 #[snafu(display("Failed to submit procedure"))]
122 SubmitProcedure {
123 #[snafu(implicit)]
124 location: Location,
125 source: common_procedure::Error,
126 },
127
128 #[snafu(display("Failed to query procedure"))]
129 QueryProcedure {
130 #[snafu(implicit)]
131 location: Location,
132 source: common_procedure::Error,
133 },
134
135 #[snafu(display("Procedure not found: {pid}"))]
136 ProcedureNotFound {
137 #[snafu(implicit)]
138 location: Location,
139 pid: String,
140 },
141
142 #[snafu(display("Failed to parse procedure id: {key}"))]
143 ParseProcedureId {
144 #[snafu(implicit)]
145 location: Location,
146 key: String,
147 #[snafu(source)]
148 error: common_procedure::ParseIdError,
149 },
150
151 #[snafu(display("Unsupported operation {}", operation))]
152 Unsupported {
153 operation: String,
154 #[snafu(implicit)]
155 location: Location,
156 },
157
158 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159 ProcedureStateReceiver {
160 procedure_id: ProcedureId,
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167 ProcedureStateReceiverNotFound {
168 procedure_id: ProcedureId,
169 #[snafu(implicit)]
170 location: Location,
171 },
172
173 #[snafu(display("Failed to wait procedure done"))]
174 WaitProcedure {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display("Failed to start procedure manager"))]
181 StartProcedureManager {
182 #[snafu(implicit)]
183 location: Location,
184 source: common_procedure::Error,
185 },
186
187 #[snafu(display("Failed to stop procedure manager"))]
188 StopProcedureManager {
189 #[snafu(implicit)]
190 location: Location,
191 source: common_procedure::Error,
192 },
193
194 #[snafu(display(
195 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196 ))]
197 ProcedureOutput {
198 procedure_id: String,
199 err_msg: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Primary key '{key}' not found when creating region request"))]
205 PrimaryKeyNotFound {
206 key: String,
207 #[snafu(implicit)]
208 location: Location,
209 },
210
211 #[snafu(display("Failed to build table meta for table: {}", table_name))]
212 BuildTableMeta {
213 table_name: String,
214 #[snafu(source)]
215 error: table::metadata::TableMetaBuilderError,
216 #[snafu(implicit)]
217 location: Location,
218 },
219
220 #[snafu(display("Table occurs error"))]
221 Table {
222 #[snafu(implicit)]
223 location: Location,
224 source: table::error::Error,
225 },
226
227 #[snafu(display("Failed to find table route for table id {}", table_id))]
228 TableRouteNotFound {
229 table_id: TableId,
230 #[snafu(implicit)]
231 location: Location,
232 },
233
234 #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
235 TableRepartNotFound {
236 table_id: TableId,
237 #[snafu(implicit)]
238 location: Location,
239 },
240
241 #[snafu(display("Failed to decode protobuf"))]
242 DecodeProto {
243 #[snafu(implicit)]
244 location: Location,
245 #[snafu(source)]
246 error: prost::DecodeError,
247 },
248
249 #[snafu(display("Failed to encode object into json"))]
250 EncodeJson {
251 #[snafu(implicit)]
252 location: Location,
253 #[snafu(source)]
254 error: JsonError,
255 },
256
257 #[snafu(display("Failed to decode object from json"))]
258 DecodeJson {
259 #[snafu(implicit)]
260 location: Location,
261 #[snafu(source)]
262 error: JsonError,
263 },
264
265 #[snafu(display("Failed to serialize to json: {}", input))]
266 SerializeToJson {
267 input: String,
268 #[snafu(source)]
269 error: serde_json::error::Error,
270 #[snafu(implicit)]
271 location: Location,
272 },
273
274 #[snafu(display("Failed to deserialize from json: {}", input))]
275 DeserializeFromJson {
276 input: String,
277 #[snafu(source)]
278 error: serde_json::error::Error,
279 #[snafu(implicit)]
280 location: Location,
281 },
282
283 #[snafu(display("Payload not exist"))]
284 PayloadNotExist {
285 #[snafu(implicit)]
286 location: Location,
287 },
288
289 #[snafu(display("Failed to serde json"))]
290 SerdeJson {
291 #[snafu(source)]
292 error: serde_json::error::Error,
293 #[snafu(implicit)]
294 location: Location,
295 },
296
297 #[snafu(display("Failed to parse value {} into key {}", value, key))]
298 ParseOption {
299 key: String,
300 value: String,
301 #[snafu(implicit)]
302 location: Location,
303 },
304
305 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
306 RouteInfoCorrupted {
307 err_msg: String,
308 #[snafu(implicit)]
309 location: Location,
310 },
311
312 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
313 IllegalServerState {
314 code: i32,
315 err_msg: String,
316 #[snafu(implicit)]
317 location: Location,
318 },
319
320 #[snafu(display("Failed to convert alter table request"))]
321 ConvertAlterTableRequest {
322 source: common_grpc_expr::error::Error,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Invalid protobuf message: {err_msg}"))]
328 InvalidProtoMsg {
329 err_msg: String,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("Unexpected: {err_msg}"))]
335 Unexpected {
336 err_msg: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("Table already exists, table: {}", table_name))]
342 TableAlreadyExists {
343 table_name: String,
344 #[snafu(implicit)]
345 location: Location,
346 },
347
348 #[snafu(display("View already exists, view: {}", view_name))]
349 ViewAlreadyExists {
350 view_name: String,
351 #[snafu(implicit)]
352 location: Location,
353 },
354
355 #[snafu(display("Flow already exists: {}", flow_name))]
356 FlowAlreadyExists {
357 flow_name: String,
358 #[snafu(implicit)]
359 location: Location,
360 },
361
362 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
363 SchemaAlreadyExists {
364 catalog: String,
365 schema: String,
366 #[snafu(implicit)]
367 location: Location,
368 },
369
370 #[snafu(display("Failed to convert raw key to str"))]
371 ConvertRawKey {
372 #[snafu(implicit)]
373 location: Location,
374 #[snafu(source)]
375 error: Utf8Error,
376 },
377
378 #[snafu(display("Table not found: '{}'", table_name))]
379 TableNotFound {
380 table_name: String,
381 #[snafu(implicit)]
382 location: Location,
383 },
384
385 #[snafu(display("Region not found: {}", region_id))]
386 RegionNotFound {
387 region_id: RegionId,
388 #[snafu(implicit)]
389 location: Location,
390 },
391
392 #[snafu(display("View not found: '{}'", view_name))]
393 ViewNotFound {
394 view_name: String,
395 #[snafu(implicit)]
396 location: Location,
397 },
398
399 #[snafu(display("Flow not found: '{}'", flow_name))]
400 FlowNotFound {
401 flow_name: String,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Flow route not found: '{}'", flow_name))]
407 FlowRouteNotFound {
408 flow_name: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Schema nod found, schema: {}", table_schema))]
414 SchemaNotFound {
415 table_schema: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[snafu(display("Catalog not found, catalog: {}", catalog))]
421 CatalogNotFound {
422 catalog: String,
423 #[snafu(implicit)]
424 location: Location,
425 },
426
427 #[snafu(display("Invalid metadata, err: {}", err_msg))]
428 InvalidMetadata {
429 err_msg: String,
430 #[snafu(implicit)]
431 location: Location,
432 },
433
434 #[snafu(display("Invalid view info, err: {}", err_msg))]
435 InvalidViewInfo {
436 err_msg: String,
437 #[snafu(implicit)]
438 location: Location,
439 },
440
441 #[snafu(display("Invalid flow request body: {:?}", body))]
442 InvalidFlowRequestBody {
443 body: Box<Option<api::v1::flow::flow_request::Body>>,
444 #[snafu(implicit)]
445 location: Location,
446 },
447
448 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
449 GetKvCache { err_msg: String },
450
451 #[snafu(display("Get null from cache, key: {}", key))]
452 CacheNotGet {
453 key: String,
454 #[snafu(implicit)]
455 location: Location,
456 },
457
458 #[snafu(display("Etcd txn error: {err_msg}"))]
459 EtcdTxnOpResponse {
460 err_msg: String,
461 #[snafu(implicit)]
462 location: Location,
463 },
464
465 #[snafu(display("External error"))]
466 External {
467 #[snafu(implicit)]
468 location: Location,
469 source: BoxedError,
470 },
471
472 #[snafu(display("The response exceeded size limit"))]
473 ResponseExceededSizeLimit {
474 #[snafu(implicit)]
475 location: Location,
476 source: BoxedError,
477 },
478
479 #[snafu(display("Invalid heartbeat response"))]
480 InvalidHeartbeatResponse {
481 #[snafu(implicit)]
482 location: Location,
483 },
484
485 #[snafu(display("Failed to operate on datanode: {}", peer))]
486 OperateDatanode {
487 #[snafu(implicit)]
488 location: Location,
489 peer: Peer,
490 source: BoxedError,
491 },
492
493 #[snafu(display("Retry later"))]
494 RetryLater {
495 source: BoxedError,
496 clean_poisons: bool,
497 },
498
499 #[snafu(display("Abort procedure"))]
500 AbortProcedure {
501 #[snafu(implicit)]
502 location: Location,
503 source: BoxedError,
504 clean_poisons: bool,
505 },
506
507 #[snafu(display(
508 "Failed to encode a wal options to json string, wal_options: {:?}",
509 wal_options
510 ))]
511 EncodeWalOptions {
512 wal_options: WalOptions,
513 #[snafu(source)]
514 error: serde_json::Error,
515 #[snafu(implicit)]
516 location: Location,
517 },
518
519 #[snafu(display("Invalid number of topics {}", num_topics))]
520 InvalidNumTopics {
521 num_topics: usize,
522 #[snafu(implicit)]
523 location: Location,
524 },
525
526 #[snafu(display(
527 "Failed to build a Kafka client, broker endpoints: {:?}",
528 broker_endpoints
529 ))]
530 BuildKafkaClient {
531 broker_endpoints: Vec<String>,
532 #[snafu(implicit)]
533 location: Location,
534 #[snafu(source)]
535 error: rskafka::client::error::Error,
536 },
537
538 #[snafu(display("Failed to create TLS Config"))]
539 TlsConfig {
540 #[snafu(implicit)]
541 location: Location,
542 source: common_wal::error::Error,
543 },
544
545 #[snafu(display("Failed to build a Kafka controller client"))]
546 BuildKafkaCtrlClient {
547 #[snafu(implicit)]
548 location: Location,
549 #[snafu(source)]
550 error: rskafka::client::error::Error,
551 },
552
553 #[snafu(display(
554 "Failed to get a Kafka partition client, topic: {}, partition: {}",
555 topic,
556 partition
557 ))]
558 KafkaPartitionClient {
559 topic: String,
560 partition: i32,
561 #[snafu(implicit)]
562 location: Location,
563 #[snafu(source)]
564 error: rskafka::client::error::Error,
565 },
566
567 #[snafu(display(
568 "Failed to get offset from Kafka, topic: {}, partition: {}",
569 topic,
570 partition
571 ))]
572 KafkaGetOffset {
573 topic: String,
574 partition: i32,
575 #[snafu(implicit)]
576 location: Location,
577 #[snafu(source)]
578 error: rskafka::client::error::Error,
579 },
580
581 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
582 ProduceRecord {
583 topic: String,
584 #[snafu(implicit)]
585 location: Location,
586 #[snafu(source)]
587 error: rskafka::client::error::Error,
588 },
589
590 #[snafu(display("Failed to create a Kafka wal topic"))]
591 CreateKafkaWalTopic {
592 #[snafu(implicit)]
593 location: Location,
594 #[snafu(source)]
595 error: rskafka::client::error::Error,
596 },
597
598 #[snafu(display("The topic pool is empty"))]
599 EmptyTopicPool {
600 #[snafu(implicit)]
601 location: Location,
602 },
603
604 #[snafu(display("Unexpected table route type: {}", err_msg))]
605 UnexpectedLogicalRouteTable {
606 #[snafu(implicit)]
607 location: Location,
608 err_msg: String,
609 },
610
611 #[snafu(display("The tasks of {} cannot be empty", name))]
612 EmptyDdlTasks {
613 name: String,
614 #[snafu(implicit)]
615 location: Location,
616 },
617
618 #[snafu(display("Metadata corruption: {}", err_msg))]
619 MetadataCorruption {
620 err_msg: String,
621 #[snafu(implicit)]
622 location: Location,
623 },
624
625 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
626 AlterLogicalTablesInvalidArguments {
627 err_msg: String,
628 #[snafu(implicit)]
629 location: Location,
630 },
631
632 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
633 CreateLogicalTablesInvalidArguments {
634 err_msg: String,
635 #[snafu(implicit)]
636 location: Location,
637 },
638
639 #[snafu(display("Invalid node info key: {}", key))]
640 InvalidNodeInfoKey {
641 key: String,
642 #[snafu(implicit)]
643 location: Location,
644 },
645
646 #[snafu(display("Invalid node stat key: {}", key))]
647 InvalidStatKey {
648 key: String,
649 #[snafu(implicit)]
650 location: Location,
651 },
652
653 #[snafu(display("Failed to parse number: {}", err_msg))]
654 ParseNum {
655 err_msg: String,
656 #[snafu(source)]
657 error: std::num::ParseIntError,
658 #[snafu(implicit)]
659 location: Location,
660 },
661
662 #[snafu(display("Invalid role: {}", role))]
663 InvalidRole {
664 role: i32,
665 #[snafu(implicit)]
666 location: Location,
667 },
668
669 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
670 InvalidSetDatabaseOption {
671 key: String,
672 value: String,
673 #[snafu(implicit)]
674 location: Location,
675 },
676
677 #[snafu(display("Invalid unset database option, key: {}", key))]
678 InvalidUnsetDatabaseOption {
679 key: String,
680 #[snafu(implicit)]
681 location: Location,
682 },
683
684 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
685 MismatchPrefix {
686 prefix: String,
687 key: String,
688 #[snafu(implicit)]
689 location: Location,
690 },
691
692 #[snafu(display("Failed to move values: {err_msg}"))]
693 MoveValues {
694 err_msg: String,
695 #[snafu(implicit)]
696 location: Location,
697 },
698
699 #[snafu(display("Failed to parse {} from utf8", name))]
700 FromUtf8 {
701 name: String,
702 #[snafu(source)]
703 error: std::string::FromUtf8Error,
704 #[snafu(implicit)]
705 location: Location,
706 },
707
708 #[snafu(display("Value not exists"))]
709 ValueNotExist {
710 #[snafu(implicit)]
711 location: Location,
712 },
713
714 #[snafu(display("Failed to get cache"))]
715 GetCache { source: Arc<Error> },
716
717 #[cfg(feature = "pg_kvbackend")]
718 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
719 PostgresExecution {
720 sql: String,
721 #[snafu(source)]
722 error: tokio_postgres::Error,
723 #[snafu(implicit)]
724 location: Location,
725 },
726
727 #[cfg(feature = "pg_kvbackend")]
728 #[snafu(display("Failed to create connection pool for Postgres"))]
729 CreatePostgresPool {
730 #[snafu(source)]
731 error: deadpool_postgres::CreatePoolError,
732 #[snafu(implicit)]
733 location: Location,
734 },
735
736 #[cfg(feature = "pg_kvbackend")]
737 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
738 GetPostgresConnection {
739 reason: String,
740 #[snafu(implicit)]
741 location: Location,
742 },
743
744 #[cfg(feature = "pg_kvbackend")]
745 #[snafu(display("Failed to {} Postgres transaction", operation))]
746 PostgresTransaction {
747 #[snafu(source)]
748 error: tokio_postgres::Error,
749 #[snafu(implicit)]
750 location: Location,
751 operation: String,
752 },
753
754 #[cfg(feature = "pg_kvbackend")]
755 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
756 PostgresTlsConfig {
757 reason: String,
758 #[snafu(implicit)]
759 location: Location,
760 },
761
762 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
763 LoadTlsCertificate {
764 path: String,
765 #[snafu(source)]
766 error: std::io::Error,
767 #[snafu(implicit)]
768 location: Location,
769 },
770
771 #[cfg(feature = "pg_kvbackend")]
772 #[snafu(display("Invalid TLS configuration: {}", reason))]
773 InvalidTlsConfig {
774 reason: String,
775 #[snafu(implicit)]
776 location: Location,
777 },
778
779 #[cfg(feature = "mysql_kvbackend")]
780 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
781 MySqlExecution {
782 sql: String,
783 #[snafu(source)]
784 error: sqlx::Error,
785 #[snafu(implicit)]
786 location: Location,
787 },
788
789 #[cfg(feature = "mysql_kvbackend")]
790 #[snafu(display("Failed to create connection pool for MySql"))]
791 CreateMySqlPool {
792 #[snafu(source)]
793 error: sqlx::Error,
794 #[snafu(implicit)]
795 location: Location,
796 },
797
798 #[cfg(feature = "mysql_kvbackend")]
799 #[snafu(display("Failed to {} MySql transaction", operation))]
800 MySqlTransaction {
801 #[snafu(source)]
802 error: sqlx::Error,
803 #[snafu(implicit)]
804 location: Location,
805 operation: String,
806 },
807
808 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
809 #[snafu(display("Rds transaction retry failed"))]
810 RdsTransactionRetryFailed {
811 #[snafu(implicit)]
812 location: Location,
813 },
814
815 #[snafu(display(
816 "Datanode table info not found, table id: {}, datanode id: {}",
817 table_id,
818 datanode_id
819 ))]
820 DatanodeTableInfoNotFound {
821 datanode_id: DatanodeId,
822 table_id: TableId,
823 #[snafu(implicit)]
824 location: Location,
825 },
826
827 #[snafu(display("Invalid topic name prefix: {}", prefix))]
828 InvalidTopicNamePrefix {
829 prefix: String,
830 #[snafu(implicit)]
831 location: Location,
832 },
833
834 #[snafu(display("Failed to parse wal options: {}", wal_options))]
835 ParseWalOptions {
836 wal_options: String,
837 #[snafu(implicit)]
838 location: Location,
839 #[snafu(source)]
840 error: serde_json::Error,
841 },
842
843 #[snafu(display("No leader found for table_id: {}", table_id))]
844 NoLeader {
845 table_id: TableId,
846 #[snafu(implicit)]
847 location: Location,
848 },
849
850 #[snafu(display(
851 "Procedure poison key already exists with a different value, key: {}, value: {}",
852 key,
853 value
854 ))]
855 ProcedurePoisonConflict {
856 key: String,
857 value: String,
858 #[snafu(implicit)]
859 location: Location,
860 },
861
862 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
863 PutPoison {
864 #[snafu(implicit)]
865 location: Location,
866 #[snafu(source)]
867 source: common_procedure::error::Error,
868 },
869
870 #[snafu(display("Invalid file path: {}", file_path))]
871 InvalidFilePath {
872 #[snafu(implicit)]
873 location: Location,
874 file_path: String,
875 },
876
877 #[snafu(display("Failed to serialize flexbuffers"))]
878 SerializeFlexbuffers {
879 #[snafu(implicit)]
880 location: Location,
881 #[snafu(source)]
882 error: flexbuffers::SerializationError,
883 },
884
885 #[snafu(display("Failed to deserialize flexbuffers"))]
886 DeserializeFlexbuffers {
887 #[snafu(implicit)]
888 location: Location,
889 #[snafu(source)]
890 error: flexbuffers::DeserializationError,
891 },
892
893 #[snafu(display("Failed to read flexbuffers"))]
894 ReadFlexbuffers {
895 #[snafu(implicit)]
896 location: Location,
897 #[snafu(source)]
898 error: flexbuffers::ReaderError,
899 },
900
901 #[snafu(display("Invalid file name: {}", reason))]
902 InvalidFileName {
903 #[snafu(implicit)]
904 location: Location,
905 reason: String,
906 },
907
908 #[snafu(display("Invalid file extension: {}", reason))]
909 InvalidFileExtension {
910 #[snafu(implicit)]
911 location: Location,
912 reason: String,
913 },
914
915 #[snafu(display("Failed to write object, file path: {}", file_path))]
916 WriteObject {
917 #[snafu(implicit)]
918 location: Location,
919 file_path: String,
920 #[snafu(source)]
921 error: object_store::Error,
922 },
923
924 #[snafu(display("Failed to read object, file path: {}", file_path))]
925 ReadObject {
926 #[snafu(implicit)]
927 location: Location,
928 file_path: String,
929 #[snafu(source)]
930 error: object_store::Error,
931 },
932
933 #[snafu(display("Missing column ids"))]
934 MissingColumnIds {
935 #[snafu(implicit)]
936 location: Location,
937 },
938
939 #[snafu(display(
940 "Missing column in column metadata: {}, table: {}, table_id: {}",
941 column_name,
942 table_name,
943 table_id,
944 ))]
945 MissingColumnInColumnMetadata {
946 column_name: String,
947 #[snafu(implicit)]
948 location: Location,
949 table_name: String,
950 table_id: TableId,
951 },
952
953 #[snafu(display(
954 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
955 column_name,
956 column_id,
957 table_name,
958 table_id,
959 ))]
960 MismatchColumnId {
961 column_name: String,
962 column_id: u32,
963 #[snafu(implicit)]
964 location: Location,
965 table_name: String,
966 table_id: TableId,
967 },
968
969 #[snafu(display("Failed to convert column def, column: {}", column))]
970 ConvertColumnDef {
971 column: String,
972 #[snafu(implicit)]
973 location: Location,
974 source: api::error::Error,
975 },
976
977 #[snafu(display("Failed to convert time ranges"))]
978 ConvertTimeRanges {
979 #[snafu(implicit)]
980 location: Location,
981 source: api::error::Error,
982 },
983
984 #[snafu(display(
985 "Column metadata inconsistencies found in table: {}, table_id: {}",
986 table_name,
987 table_id
988 ))]
989 ColumnMetadataConflicts {
990 table_name: String,
991 table_id: TableId,
992 },
993
994 #[snafu(display(
995 "Column not found in column metadata, column_name: {}, column_id: {}",
996 column_name,
997 column_id
998 ))]
999 ColumnNotFound { column_name: String, column_id: u32 },
1000
1001 #[snafu(display(
1002 "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1003 column_name,
1004 expected_column_id,
1005 actual_column_id
1006 ))]
1007 ColumnIdMismatch {
1008 column_name: String,
1009 expected_column_id: u32,
1010 actual_column_id: u32,
1011 },
1012
1013 #[snafu(display(
1014 "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1015 expected_column_name,
1016 expected_column_id,
1017 actual_column_name,
1018 actual_column_id,
1019 ))]
1020 TimestampMismatch {
1021 expected_column_name: String,
1022 expected_column_id: u32,
1023 actual_column_name: String,
1024 actual_column_id: u32,
1025 },
1026
1027 #[cfg(feature = "enterprise")]
1028 #[snafu(display("Too large duration"))]
1029 TooLargeDuration {
1030 #[snafu(source)]
1031 error: prost_types::DurationError,
1032 #[snafu(implicit)]
1033 location: Location,
1034 },
1035
1036 #[cfg(feature = "enterprise")]
1037 #[snafu(display("Negative duration"))]
1038 NegativeDuration {
1039 #[snafu(source)]
1040 error: prost_types::DurationError,
1041 #[snafu(implicit)]
1042 location: Location,
1043 },
1044
1045 #[cfg(feature = "enterprise")]
1046 #[snafu(display("Missing interval field"))]
1047 MissingInterval {
1048 #[snafu(implicit)]
1049 location: Location,
1050 },
1051}
1052
1053pub type Result<T> = std::result::Result<T, Error>;
1054
1055impl ErrorExt for Error {
1056 fn status_code(&self) -> StatusCode {
1057 use Error::*;
1058 match self {
1059 IllegalServerState { .. }
1060 | EtcdTxnOpResponse { .. }
1061 | EtcdFailed { .. }
1062 | EtcdTxnFailed { .. }
1063 | ConnectEtcd { .. }
1064 | MoveValues { .. }
1065 | GetCache { .. }
1066 | SerializeToJson { .. }
1067 | DeserializeFromJson { .. } => StatusCode::Internal,
1068
1069 NoLeader { .. } => StatusCode::TableUnavailable,
1070 ValueNotExist { .. }
1071 | ProcedurePoisonConflict { .. }
1072 | ProcedureStateReceiverNotFound { .. }
1073 | MissingColumnIds { .. }
1074 | MissingColumnInColumnMetadata { .. }
1075 | MismatchColumnId { .. }
1076 | ColumnMetadataConflicts { .. }
1077 | ColumnNotFound { .. }
1078 | ColumnIdMismatch { .. }
1079 | TimestampMismatch { .. } => StatusCode::Unexpected,
1080
1081 Unsupported { .. } => StatusCode::Unsupported,
1082 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1083
1084 SerdeJson { .. }
1085 | ParseOption { .. }
1086 | RouteInfoCorrupted { .. }
1087 | InvalidProtoMsg { .. }
1088 | InvalidMetadata { .. }
1089 | Unexpected { .. }
1090 | TableInfoNotFound { .. }
1091 | NextSequence { .. }
1092 | UnexpectedSequenceValue { .. }
1093 | InvalidHeartbeatResponse { .. }
1094 | EncodeJson { .. }
1095 | DecodeJson { .. }
1096 | PayloadNotExist { .. }
1097 | ConvertRawKey { .. }
1098 | DecodeProto { .. }
1099 | BuildTableMeta { .. }
1100 | TableRouteNotFound { .. }
1101 | TableRepartNotFound { .. }
1102 | RegionOperatingRace { .. }
1103 | EncodeWalOptions { .. }
1104 | BuildKafkaClient { .. }
1105 | BuildKafkaCtrlClient { .. }
1106 | KafkaPartitionClient { .. }
1107 | ProduceRecord { .. }
1108 | CreateKafkaWalTopic { .. }
1109 | EmptyTopicPool { .. }
1110 | UnexpectedLogicalRouteTable { .. }
1111 | ProcedureOutput { .. }
1112 | FromUtf8 { .. }
1113 | MetadataCorruption { .. }
1114 | ParseWalOptions { .. }
1115 | KafkaGetOffset { .. }
1116 | ReadFlexbuffers { .. }
1117 | SerializeFlexbuffers { .. }
1118 | DeserializeFlexbuffers { .. }
1119 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1120
1121 GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1122
1123 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1124
1125 ProcedureNotFound { .. }
1126 | InvalidViewInfo { .. }
1127 | PrimaryKeyNotFound { .. }
1128 | EmptyKey { .. }
1129 | AlterLogicalTablesInvalidArguments { .. }
1130 | CreateLogicalTablesInvalidArguments { .. }
1131 | MismatchPrefix { .. }
1132 | TlsConfig { .. }
1133 | InvalidSetDatabaseOption { .. }
1134 | InvalidUnsetDatabaseOption { .. }
1135 | InvalidTopicNamePrefix { .. }
1136 | InvalidFileExtension { .. }
1137 | InvalidFileName { .. }
1138 | InvalidFlowRequestBody { .. }
1139 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1140
1141 #[cfg(feature = "enterprise")]
1142 MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1143 StatusCode::InvalidArguments
1144 }
1145
1146 FlowNotFound { .. } => StatusCode::FlowNotFound,
1147 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1148 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1149
1150 ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1151 StatusCode::TableNotFound
1152 }
1153 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1154
1155 SubmitProcedure { source, .. }
1156 | QueryProcedure { source, .. }
1157 | WaitProcedure { source, .. }
1158 | StartProcedureManager { source, .. }
1159 | StopProcedureManager { source, .. } => source.status_code(),
1160 RegisterProcedureLoader { source, .. } => source.status_code(),
1161 External { source, .. } => source.status_code(),
1162 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1163 OperateDatanode { source, .. } => source.status_code(),
1164 Table { source, .. } => source.status_code(),
1165 RetryLater { source, .. } => source.status_code(),
1166 AbortProcedure { source, .. } => source.status_code(),
1167 ConvertAlterTableRequest { source, .. } => source.status_code(),
1168 PutPoison { source, .. } => source.status_code(),
1169 ConvertColumnDef { source, .. } => source.status_code(),
1170 ProcedureStateReceiver { source, .. } => source.status_code(),
1171 RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1172 CreateRepartitionProcedure { source, .. } => source.status_code(),
1173
1174 ParseProcedureId { .. }
1175 | InvalidNumTopics { .. }
1176 | SchemaNotFound { .. }
1177 | CatalogNotFound { .. }
1178 | InvalidNodeInfoKey { .. }
1179 | InvalidStatKey { .. }
1180 | ParseNum { .. }
1181 | InvalidRole { .. }
1182 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1183
1184 LoadTlsCertificate { .. } => StatusCode::Internal,
1185
1186 #[cfg(feature = "pg_kvbackend")]
1187 PostgresExecution { .. }
1188 | CreatePostgresPool { .. }
1189 | GetPostgresConnection { .. }
1190 | PostgresTransaction { .. }
1191 | PostgresTlsConfig { .. }
1192 | InvalidTlsConfig { .. } => StatusCode::Internal,
1193 #[cfg(feature = "mysql_kvbackend")]
1194 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1195 StatusCode::Internal
1196 }
1197 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1198 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1199 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1200 }
1201 }
1202
1203 fn as_any(&self) -> &dyn std::any::Any {
1204 self
1205 }
1206}
1207
1208impl Error {
1209 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1210 pub fn is_serialization_error(&self) -> bool {
1212 match self {
1213 #[cfg(feature = "pg_kvbackend")]
1214 Error::PostgresTransaction { error, .. } => {
1215 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1216 }
1217 #[cfg(feature = "pg_kvbackend")]
1218 Error::PostgresExecution { error, .. } => {
1219 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1220 }
1221 #[cfg(feature = "mysql_kvbackend")]
1222 Error::MySqlExecution {
1223 error: sqlx::Error::Database(database_error),
1224 ..
1225 } => {
1226 matches!(
1227 database_error.message(),
1228 "Deadlock found when trying to get lock; try restarting transaction"
1229 | "can't serialize access for this transaction"
1230 )
1231 }
1232 _ => false,
1233 }
1234 }
1235
1236 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1238 Error::RetryLater {
1239 source: BoxedError::new(err),
1240 clean_poisons: false,
1241 }
1242 }
1243
1244 pub fn is_retry_later(&self) -> bool {
1246 matches!(self, Error::RetryLater { .. })
1247 }
1248
1249 pub fn need_clean_poisons(&self) -> bool {
1251 matches!(
1252 self,
1253 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1254 ) || matches!(
1255 self,
1256 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1257 )
1258 }
1259
1260 pub fn is_exceeded_size_limit(&self) -> bool {
1262 match self {
1263 Error::EtcdFailed {
1264 error: etcd_client::Error::GRpcStatus(status),
1265 ..
1266 } => status.code() == tonic::Code::OutOfRange,
1267 Error::ResponseExceededSizeLimit { .. } => true,
1268 _ => false,
1269 }
1270 }
1271}