1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to register repartition procedure loader"))]
108 RegisterRepartitionProcedureLoader {
109 #[snafu(implicit)]
110 location: Location,
111 source: BoxedError,
112 },
113
114 #[snafu(display("Failed to create repartition procedure"))]
115 CreateRepartitionProcedure {
116 source: BoxedError,
117 #[snafu(implicit)]
118 location: Location,
119 },
120
121 #[snafu(display("Failed to submit procedure"))]
122 SubmitProcedure {
123 #[snafu(implicit)]
124 location: Location,
125 source: common_procedure::Error,
126 },
127
128 #[snafu(display("Failed to query procedure"))]
129 QueryProcedure {
130 #[snafu(implicit)]
131 location: Location,
132 source: common_procedure::Error,
133 },
134
135 #[snafu(display("Procedure not found: {pid}"))]
136 ProcedureNotFound {
137 #[snafu(implicit)]
138 location: Location,
139 pid: String,
140 },
141
142 #[snafu(display("Failed to parse procedure id: {key}"))]
143 ParseProcedureId {
144 #[snafu(implicit)]
145 location: Location,
146 key: String,
147 #[snafu(source)]
148 error: common_procedure::ParseIdError,
149 },
150
151 #[snafu(display("Unsupported operation {}", operation))]
152 Unsupported {
153 operation: String,
154 #[snafu(implicit)]
155 location: Location,
156 },
157
158 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159 ProcedureStateReceiver {
160 procedure_id: ProcedureId,
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167 ProcedureStateReceiverNotFound {
168 procedure_id: ProcedureId,
169 #[snafu(implicit)]
170 location: Location,
171 },
172
173 #[snafu(display("Failed to wait procedure done"))]
174 WaitProcedure {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display("Failed to start procedure manager"))]
181 StartProcedureManager {
182 #[snafu(implicit)]
183 location: Location,
184 source: common_procedure::Error,
185 },
186
187 #[snafu(display("Failed to stop procedure manager"))]
188 StopProcedureManager {
189 #[snafu(implicit)]
190 location: Location,
191 source: common_procedure::Error,
192 },
193
194 #[snafu(display(
195 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196 ))]
197 ProcedureOutput {
198 procedure_id: String,
199 err_msg: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Primary key '{key}' not found when creating region request"))]
205 PrimaryKeyNotFound {
206 key: String,
207 #[snafu(implicit)]
208 location: Location,
209 },
210
211 #[snafu(display("Failed to build table meta for table: {}", table_name))]
212 BuildTableMeta {
213 table_name: String,
214 #[snafu(source)]
215 error: table::metadata::TableMetaBuilderError,
216 #[snafu(implicit)]
217 location: Location,
218 },
219
220 #[snafu(display("Table occurs error"))]
221 Table {
222 #[snafu(implicit)]
223 location: Location,
224 source: table::error::Error,
225 },
226
227 #[snafu(display("Failed to find table route for table id {}", table_id))]
228 TableRouteNotFound {
229 table_id: TableId,
230 #[snafu(implicit)]
231 location: Location,
232 },
233
234 #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
235 TableRepartNotFound {
236 table_id: TableId,
237 #[snafu(implicit)]
238 location: Location,
239 },
240
241 #[snafu(display("Failed to decode protobuf"))]
242 DecodeProto {
243 #[snafu(implicit)]
244 location: Location,
245 #[snafu(source)]
246 error: prost::DecodeError,
247 },
248
249 #[snafu(display("Failed to encode object into json"))]
250 EncodeJson {
251 #[snafu(implicit)]
252 location: Location,
253 #[snafu(source)]
254 error: JsonError,
255 },
256
257 #[snafu(display("Failed to decode object from json"))]
258 DecodeJson {
259 #[snafu(implicit)]
260 location: Location,
261 #[snafu(source)]
262 error: JsonError,
263 },
264
265 #[snafu(display("Failed to serialize to json: {}", input))]
266 SerializeToJson {
267 input: String,
268 #[snafu(source)]
269 error: serde_json::error::Error,
270 #[snafu(implicit)]
271 location: Location,
272 },
273
274 #[snafu(display("Failed to deserialize from json: {}", input))]
275 DeserializeFromJson {
276 input: String,
277 #[snafu(source)]
278 error: serde_json::error::Error,
279 #[snafu(implicit)]
280 location: Location,
281 },
282
283 #[snafu(display("Payload not exist"))]
284 PayloadNotExist {
285 #[snafu(implicit)]
286 location: Location,
287 },
288
289 #[snafu(display("Failed to serde json"))]
290 SerdeJson {
291 #[snafu(source)]
292 error: serde_json::error::Error,
293 #[snafu(implicit)]
294 location: Location,
295 },
296
297 #[snafu(display("Failed to parse value {} into key {}", value, key))]
298 ParseOption {
299 key: String,
300 value: String,
301 #[snafu(implicit)]
302 location: Location,
303 },
304
305 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
306 RouteInfoCorrupted {
307 err_msg: String,
308 #[snafu(implicit)]
309 location: Location,
310 },
311
312 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
313 IllegalServerState {
314 code: i32,
315 err_msg: String,
316 #[snafu(implicit)]
317 location: Location,
318 },
319
320 #[snafu(display("Failed to convert alter table request"))]
321 ConvertAlterTableRequest {
322 source: common_grpc_expr::error::Error,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Invalid protobuf message: {err_msg}"))]
328 InvalidProtoMsg {
329 err_msg: String,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("Unexpected: {err_msg}"))]
335 Unexpected {
336 err_msg: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("Metasrv election has no leader at this moment"))]
342 ElectionNoLeader {
343 #[snafu(implicit)]
344 location: Location,
345 },
346
347 #[snafu(display("Metasrv election leader lease expired"))]
348 ElectionLeaderLeaseExpired {
349 #[snafu(implicit)]
350 location: Location,
351 },
352
353 #[snafu(display("Metasrv election leader lease changed during election"))]
354 ElectionLeaderLeaseChanged {
355 #[snafu(implicit)]
356 location: Location,
357 },
358
359 #[snafu(display("Table already exists, table: {}", table_name))]
360 TableAlreadyExists {
361 table_name: String,
362 #[snafu(implicit)]
363 location: Location,
364 },
365
366 #[snafu(display("View already exists, view: {}", view_name))]
367 ViewAlreadyExists {
368 view_name: String,
369 #[snafu(implicit)]
370 location: Location,
371 },
372
373 #[snafu(display("Flow already exists: {}", flow_name))]
374 FlowAlreadyExists {
375 flow_name: String,
376 #[snafu(implicit)]
377 location: Location,
378 },
379
380 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
381 SchemaAlreadyExists {
382 catalog: String,
383 schema: String,
384 #[snafu(implicit)]
385 location: Location,
386 },
387
388 #[snafu(display("Failed to convert raw key to str"))]
389 ConvertRawKey {
390 #[snafu(implicit)]
391 location: Location,
392 #[snafu(source)]
393 error: Utf8Error,
394 },
395
396 #[snafu(display("Table not found: '{}'", table_name))]
397 TableNotFound {
398 table_name: String,
399 #[snafu(implicit)]
400 location: Location,
401 },
402
403 #[snafu(display("Region not found: {}", region_id))]
404 RegionNotFound {
405 region_id: RegionId,
406 #[snafu(implicit)]
407 location: Location,
408 },
409
410 #[snafu(display("View not found: '{}'", view_name))]
411 ViewNotFound {
412 view_name: String,
413 #[snafu(implicit)]
414 location: Location,
415 },
416
417 #[snafu(display("Flow not found: '{}'", flow_name))]
418 FlowNotFound {
419 flow_name: String,
420 #[snafu(implicit)]
421 location: Location,
422 },
423
424 #[snafu(display("Flow route not found: '{}'", flow_name))]
425 FlowRouteNotFound {
426 flow_name: String,
427 #[snafu(implicit)]
428 location: Location,
429 },
430
431 #[snafu(display("Schema nod found, schema: {}", table_schema))]
432 SchemaNotFound {
433 table_schema: String,
434 #[snafu(implicit)]
435 location: Location,
436 },
437
438 #[snafu(display("Catalog not found, catalog: {}", catalog))]
439 CatalogNotFound {
440 catalog: String,
441 #[snafu(implicit)]
442 location: Location,
443 },
444
445 #[snafu(display("Invalid metadata, err: {}", err_msg))]
446 InvalidMetadata {
447 err_msg: String,
448 #[snafu(implicit)]
449 location: Location,
450 },
451
452 #[snafu(display("Invalid view info, err: {}", err_msg))]
453 InvalidViewInfo {
454 err_msg: String,
455 #[snafu(implicit)]
456 location: Location,
457 },
458
459 #[snafu(display("Invalid flow request body: {:?}", body))]
460 InvalidFlowRequestBody {
461 body: Box<Option<api::v1::flow::flow_request::Body>>,
462 #[snafu(implicit)]
463 location: Location,
464 },
465
466 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
467 GetKvCache { err_msg: String },
468
469 #[snafu(display("Get null from cache, key: {}", key))]
470 CacheNotGet {
471 key: String,
472 #[snafu(implicit)]
473 location: Location,
474 },
475
476 #[snafu(display("Etcd txn error: {err_msg}"))]
477 EtcdTxnOpResponse {
478 err_msg: String,
479 #[snafu(implicit)]
480 location: Location,
481 },
482
483 #[snafu(display("External error"))]
484 External {
485 #[snafu(implicit)]
486 location: Location,
487 source: BoxedError,
488 },
489
490 #[snafu(display("The response exceeded size limit"))]
491 ResponseExceededSizeLimit {
492 #[snafu(implicit)]
493 location: Location,
494 source: BoxedError,
495 },
496
497 #[snafu(display("Invalid heartbeat response"))]
498 InvalidHeartbeatResponse {
499 #[snafu(implicit)]
500 location: Location,
501 },
502
503 #[snafu(display("Failed to operate on datanode: {}", peer))]
504 OperateDatanode {
505 #[snafu(implicit)]
506 location: Location,
507 peer: Peer,
508 source: BoxedError,
509 },
510
511 #[snafu(display("Retry later"))]
512 RetryLater {
513 source: BoxedError,
514 clean_poisons: bool,
515 },
516
517 #[snafu(display("Abort procedure"))]
518 AbortProcedure {
519 #[snafu(implicit)]
520 location: Location,
521 source: BoxedError,
522 clean_poisons: bool,
523 },
524
525 #[snafu(display(
526 "Failed to encode a wal options to json string, wal_options: {:?}",
527 wal_options
528 ))]
529 EncodeWalOptions {
530 wal_options: WalOptions,
531 #[snafu(source)]
532 error: serde_json::Error,
533 #[snafu(implicit)]
534 location: Location,
535 },
536
537 #[snafu(display("Invalid number of topics {}", num_topics))]
538 InvalidNumTopics {
539 num_topics: usize,
540 #[snafu(implicit)]
541 location: Location,
542 },
543
544 #[snafu(display(
545 "Failed to build a Kafka client, broker endpoints: {:?}",
546 broker_endpoints
547 ))]
548 BuildKafkaClient {
549 broker_endpoints: Vec<String>,
550 #[snafu(implicit)]
551 location: Location,
552 #[snafu(source)]
553 error: rskafka::client::error::Error,
554 },
555
556 #[snafu(display("Failed to create TLS Config"))]
557 TlsConfig {
558 #[snafu(implicit)]
559 location: Location,
560 source: common_wal::error::Error,
561 },
562
563 #[snafu(display("Failed to build a Kafka controller client"))]
564 BuildKafkaCtrlClient {
565 #[snafu(implicit)]
566 location: Location,
567 #[snafu(source)]
568 error: rskafka::client::error::Error,
569 },
570
571 #[snafu(display(
572 "Failed to get a Kafka partition client, topic: {}, partition: {}",
573 topic,
574 partition
575 ))]
576 KafkaPartitionClient {
577 topic: String,
578 partition: i32,
579 #[snafu(implicit)]
580 location: Location,
581 #[snafu(source)]
582 error: rskafka::client::error::Error,
583 },
584
585 #[snafu(display(
586 "Failed to get offset from Kafka, topic: {}, partition: {}",
587 topic,
588 partition
589 ))]
590 KafkaGetOffset {
591 topic: String,
592 partition: i32,
593 #[snafu(implicit)]
594 location: Location,
595 #[snafu(source)]
596 error: rskafka::client::error::Error,
597 },
598
599 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
600 ProduceRecord {
601 topic: String,
602 #[snafu(implicit)]
603 location: Location,
604 #[snafu(source)]
605 error: rskafka::client::error::Error,
606 },
607
608 #[snafu(display("Failed to create a Kafka wal topic"))]
609 CreateKafkaWalTopic {
610 #[snafu(implicit)]
611 location: Location,
612 #[snafu(source)]
613 error: rskafka::client::error::Error,
614 },
615
616 #[snafu(display("The topic pool is empty"))]
617 EmptyTopicPool {
618 #[snafu(implicit)]
619 location: Location,
620 },
621
622 #[snafu(display("Unexpected table route type: {}", err_msg))]
623 UnexpectedLogicalRouteTable {
624 #[snafu(implicit)]
625 location: Location,
626 err_msg: String,
627 },
628
629 #[snafu(display("The tasks of {} cannot be empty", name))]
630 EmptyDdlTasks {
631 name: String,
632 #[snafu(implicit)]
633 location: Location,
634 },
635
636 #[snafu(display("Metadata corruption: {}", err_msg))]
637 MetadataCorruption {
638 err_msg: String,
639 #[snafu(implicit)]
640 location: Location,
641 },
642
643 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
644 AlterLogicalTablesInvalidArguments {
645 err_msg: String,
646 #[snafu(implicit)]
647 location: Location,
648 },
649
650 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
651 CreateLogicalTablesInvalidArguments {
652 err_msg: String,
653 #[snafu(implicit)]
654 location: Location,
655 },
656
657 #[snafu(display("Invalid node info key: {}", key))]
658 InvalidNodeInfoKey {
659 key: String,
660 #[snafu(implicit)]
661 location: Location,
662 },
663
664 #[snafu(display("Invalid node stat key: {}", key))]
665 InvalidStatKey {
666 key: String,
667 #[snafu(implicit)]
668 location: Location,
669 },
670
671 #[snafu(display("Failed to parse number: {}", err_msg))]
672 ParseNum {
673 err_msg: String,
674 #[snafu(source)]
675 error: std::num::ParseIntError,
676 #[snafu(implicit)]
677 location: Location,
678 },
679
680 #[snafu(display("Invalid role: {}", role))]
681 InvalidRole {
682 role: i32,
683 #[snafu(implicit)]
684 location: Location,
685 },
686
687 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
688 InvalidSetDatabaseOption {
689 key: String,
690 value: String,
691 #[snafu(implicit)]
692 location: Location,
693 },
694
695 #[snafu(display("Invalid unset database option, key: {}", key))]
696 InvalidUnsetDatabaseOption {
697 key: String,
698 #[snafu(implicit)]
699 location: Location,
700 },
701
702 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
703 MismatchPrefix {
704 prefix: String,
705 key: String,
706 #[snafu(implicit)]
707 location: Location,
708 },
709
710 #[snafu(display("Failed to move values: {err_msg}"))]
711 MoveValues {
712 err_msg: String,
713 #[snafu(implicit)]
714 location: Location,
715 },
716
717 #[snafu(display("Failed to parse {} from utf8", name))]
718 FromUtf8 {
719 name: String,
720 #[snafu(source)]
721 error: std::string::FromUtf8Error,
722 #[snafu(implicit)]
723 location: Location,
724 },
725
726 #[snafu(display("Value not exists"))]
727 ValueNotExist {
728 #[snafu(implicit)]
729 location: Location,
730 },
731
732 #[snafu(display("Failed to get cache"))]
733 GetCache { source: Arc<Error> },
734
735 #[snafu(display(
736 "Failed to get latest cache value after {} attempts due to concurrent invalidation",
737 attempts
738 ))]
739 GetLatestCacheRetryExceeded {
740 attempts: usize,
741 #[snafu(implicit)]
742 location: Location,
743 },
744
745 #[cfg(feature = "pg_kvbackend")]
746 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
747 PostgresExecution {
748 sql: String,
749 #[snafu(source)]
750 error: tokio_postgres::Error,
751 #[snafu(implicit)]
752 location: Location,
753 },
754
755 #[cfg(feature = "pg_kvbackend")]
756 #[snafu(display("Failed to create connection pool for Postgres"))]
757 CreatePostgresPool {
758 #[snafu(source)]
759 error: deadpool_postgres::CreatePoolError,
760 #[snafu(implicit)]
761 location: Location,
762 },
763
764 #[cfg(feature = "pg_kvbackend")]
765 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
766 GetPostgresConnection {
767 reason: String,
768 #[snafu(implicit)]
769 location: Location,
770 },
771
772 #[cfg(feature = "pg_kvbackend")]
773 #[snafu(display("Failed to get Postgres client"))]
774 GetPostgresClient {
775 #[snafu(source)]
776 error: deadpool::managed::PoolError<tokio_postgres::Error>,
777 #[snafu(implicit)]
778 location: Location,
779 },
780
781 #[cfg(feature = "pg_kvbackend")]
782 #[snafu(display("Failed to {} Postgres transaction", operation))]
783 PostgresTransaction {
784 #[snafu(source)]
785 error: tokio_postgres::Error,
786 #[snafu(implicit)]
787 location: Location,
788 operation: String,
789 },
790
791 #[cfg(feature = "pg_kvbackend")]
792 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
793 PostgresTlsConfig {
794 reason: String,
795 #[snafu(implicit)]
796 location: Location,
797 },
798
799 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
800 LoadTlsCertificate {
801 path: String,
802 #[snafu(source)]
803 error: std::io::Error,
804 #[snafu(implicit)]
805 location: Location,
806 },
807
808 #[cfg(feature = "pg_kvbackend")]
809 #[snafu(display("Invalid TLS configuration: {}", reason))]
810 InvalidTlsConfig {
811 reason: String,
812 #[snafu(implicit)]
813 location: Location,
814 },
815
816 #[cfg(feature = "mysql_kvbackend")]
817 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
818 MySqlExecution {
819 sql: String,
820 #[snafu(source)]
821 error: sqlx::Error,
822 #[snafu(implicit)]
823 location: Location,
824 },
825
826 #[cfg(feature = "mysql_kvbackend")]
827 #[snafu(display("Failed to create connection pool for MySql"))]
828 CreateMySqlPool {
829 #[snafu(source)]
830 error: sqlx::Error,
831 #[snafu(implicit)]
832 location: Location,
833 },
834
835 #[cfg(feature = "mysql_kvbackend")]
836 #[snafu(display("Failed to decode sql value"))]
837 DecodeSqlValue {
838 #[snafu(source)]
839 error: sqlx::error::Error,
840 #[snafu(implicit)]
841 location: Location,
842 },
843
844 #[cfg(feature = "mysql_kvbackend")]
845 #[snafu(display("Failed to acquire mysql client from pool"))]
846 AcquireMySqlClient {
847 #[snafu(source)]
848 error: sqlx::Error,
849 #[snafu(implicit)]
850 location: Location,
851 },
852
853 #[cfg(feature = "mysql_kvbackend")]
854 #[snafu(display("Failed to {} MySql transaction", operation))]
855 MySqlTransaction {
856 #[snafu(source)]
857 error: sqlx::Error,
858 #[snafu(implicit)]
859 location: Location,
860 operation: String,
861 },
862
863 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
864 #[snafu(display("Rds transaction retry failed"))]
865 RdsTransactionRetryFailed {
866 #[snafu(implicit)]
867 location: Location,
868 },
869
870 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
871 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
872 SqlExecutionTimeout {
873 sql: String,
874 duration: std::time::Duration,
875 #[snafu(implicit)]
876 location: Location,
877 },
878
879 #[snafu(display(
880 "Datanode table info not found, table id: {}, datanode id: {}",
881 table_id,
882 datanode_id
883 ))]
884 DatanodeTableInfoNotFound {
885 datanode_id: DatanodeId,
886 table_id: TableId,
887 #[snafu(implicit)]
888 location: Location,
889 },
890
891 #[snafu(display("Invalid topic name prefix: {}", prefix))]
892 InvalidTopicNamePrefix {
893 prefix: String,
894 #[snafu(implicit)]
895 location: Location,
896 },
897
898 #[snafu(display("Failed to parse wal options: {}", wal_options))]
899 ParseWalOptions {
900 wal_options: String,
901 #[snafu(implicit)]
902 location: Location,
903 #[snafu(source)]
904 error: serde_json::Error,
905 },
906
907 #[snafu(display("No leader found for table_id: {}", table_id))]
908 NoLeader {
909 table_id: TableId,
910 #[snafu(implicit)]
911 location: Location,
912 },
913
914 #[snafu(display(
915 "Procedure poison key already exists with a different value, key: {}, value: {}",
916 key,
917 value
918 ))]
919 ProcedurePoisonConflict {
920 key: String,
921 value: String,
922 #[snafu(implicit)]
923 location: Location,
924 },
925
926 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
927 PutPoison {
928 #[snafu(implicit)]
929 location: Location,
930 #[snafu(source)]
931 source: common_procedure::error::Error,
932 },
933
934 #[snafu(display("Invalid file path: {}", file_path))]
935 InvalidFilePath {
936 #[snafu(implicit)]
937 location: Location,
938 file_path: String,
939 },
940
941 #[snafu(display("Failed to serialize flexbuffers"))]
942 SerializeFlexbuffers {
943 #[snafu(implicit)]
944 location: Location,
945 #[snafu(source)]
946 error: flexbuffers::SerializationError,
947 },
948
949 #[snafu(display("Failed to deserialize flexbuffers"))]
950 DeserializeFlexbuffers {
951 #[snafu(implicit)]
952 location: Location,
953 #[snafu(source)]
954 error: flexbuffers::DeserializationError,
955 },
956
957 #[snafu(display("Failed to read flexbuffers"))]
958 ReadFlexbuffers {
959 #[snafu(implicit)]
960 location: Location,
961 #[snafu(source)]
962 error: flexbuffers::ReaderError,
963 },
964
965 #[snafu(display("Invalid file name: {}", reason))]
966 InvalidFileName {
967 #[snafu(implicit)]
968 location: Location,
969 reason: String,
970 },
971
972 #[snafu(display("Invalid file extension: {}", reason))]
973 InvalidFileExtension {
974 #[snafu(implicit)]
975 location: Location,
976 reason: String,
977 },
978
979 #[snafu(display("Failed to write object, file path: {}", file_path))]
980 WriteObject {
981 #[snafu(implicit)]
982 location: Location,
983 file_path: String,
984 #[snafu(source)]
985 error: object_store::Error,
986 },
987
988 #[snafu(display("Failed to read object, file path: {}", file_path))]
989 ReadObject {
990 #[snafu(implicit)]
991 location: Location,
992 file_path: String,
993 #[snafu(source)]
994 error: object_store::Error,
995 },
996
997 #[snafu(display("Missing column ids"))]
998 MissingColumnIds {
999 #[snafu(implicit)]
1000 location: Location,
1001 },
1002
1003 #[snafu(display(
1004 "Missing column in column metadata: {}, table: {}, table_id: {}",
1005 column_name,
1006 table_name,
1007 table_id,
1008 ))]
1009 MissingColumnInColumnMetadata {
1010 column_name: String,
1011 #[snafu(implicit)]
1012 location: Location,
1013 table_name: String,
1014 table_id: TableId,
1015 },
1016
1017 #[snafu(display(
1018 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
1019 column_name,
1020 column_id,
1021 table_name,
1022 table_id,
1023 ))]
1024 MismatchColumnId {
1025 column_name: String,
1026 column_id: u32,
1027 #[snafu(implicit)]
1028 location: Location,
1029 table_name: String,
1030 table_id: TableId,
1031 },
1032
1033 #[snafu(display("Failed to convert column def, column: {}", column))]
1034 ConvertColumnDef {
1035 column: String,
1036 #[snafu(implicit)]
1037 location: Location,
1038 source: api::error::Error,
1039 },
1040
1041 #[snafu(display("Failed to convert time ranges"))]
1042 ConvertTimeRanges {
1043 #[snafu(implicit)]
1044 location: Location,
1045 source: api::error::Error,
1046 },
1047
1048 #[snafu(display(
1049 "Column metadata inconsistencies found in table: {}, table_id: {}",
1050 table_name,
1051 table_id
1052 ))]
1053 ColumnMetadataConflicts {
1054 table_name: String,
1055 table_id: TableId,
1056 },
1057
1058 #[snafu(display(
1059 "Column not found in column metadata, column_name: {}, column_id: {}",
1060 column_name,
1061 column_id
1062 ))]
1063 ColumnNotFound { column_name: String, column_id: u32 },
1064
1065 #[snafu(display(
1066 "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1067 column_name,
1068 expected_column_id,
1069 actual_column_id
1070 ))]
1071 ColumnIdMismatch {
1072 column_name: String,
1073 expected_column_id: u32,
1074 actual_column_id: u32,
1075 },
1076
1077 #[snafu(display(
1078 "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1079 expected_column_name,
1080 expected_column_id,
1081 actual_column_name,
1082 actual_column_id,
1083 ))]
1084 TimestampMismatch {
1085 expected_column_name: String,
1086 expected_column_id: u32,
1087 actual_column_name: String,
1088 actual_column_id: u32,
1089 },
1090
1091 #[cfg(feature = "enterprise")]
1092 #[snafu(display("Too large duration"))]
1093 TooLargeDuration {
1094 #[snafu(source)]
1095 error: prost_types::DurationError,
1096 #[snafu(implicit)]
1097 location: Location,
1098 },
1099
1100 #[cfg(feature = "enterprise")]
1101 #[snafu(display("Negative duration"))]
1102 NegativeDuration {
1103 #[snafu(source)]
1104 error: prost_types::DurationError,
1105 #[snafu(implicit)]
1106 location: Location,
1107 },
1108
1109 #[cfg(feature = "enterprise")]
1110 #[snafu(display("Missing interval field"))]
1111 MissingInterval {
1112 #[snafu(implicit)]
1113 location: Location,
1114 },
1115}
1116
1117pub type Result<T> = std::result::Result<T, Error>;
1118
1119impl ErrorExt for Error {
1120 fn status_code(&self) -> StatusCode {
1121 use Error::*;
1122 match self {
1123 IllegalServerState { .. }
1124 | EtcdTxnOpResponse { .. }
1125 | EtcdFailed { .. }
1126 | EtcdTxnFailed { .. }
1127 | ConnectEtcd { .. }
1128 | MoveValues { .. }
1129 | GetCache { .. }
1130 | GetLatestCacheRetryExceeded { .. }
1131 | SerializeToJson { .. }
1132 | DeserializeFromJson { .. }
1133 | ElectionNoLeader { .. }
1134 | ElectionLeaderLeaseExpired { .. }
1135 | ElectionLeaderLeaseChanged { .. } => StatusCode::Internal,
1136
1137 NoLeader { .. } => StatusCode::TableUnavailable,
1138 ValueNotExist { .. }
1139 | ProcedurePoisonConflict { .. }
1140 | ProcedureStateReceiverNotFound { .. }
1141 | MissingColumnIds { .. }
1142 | MissingColumnInColumnMetadata { .. }
1143 | MismatchColumnId { .. }
1144 | ColumnMetadataConflicts { .. }
1145 | ColumnNotFound { .. }
1146 | ColumnIdMismatch { .. }
1147 | TimestampMismatch { .. } => StatusCode::Unexpected,
1148
1149 Unsupported { .. } => StatusCode::Unsupported,
1150 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1151
1152 SerdeJson { .. }
1153 | ParseOption { .. }
1154 | RouteInfoCorrupted { .. }
1155 | InvalidProtoMsg { .. }
1156 | InvalidMetadata { .. }
1157 | Unexpected { .. }
1158 | TableInfoNotFound { .. }
1159 | NextSequence { .. }
1160 | UnexpectedSequenceValue { .. }
1161 | InvalidHeartbeatResponse { .. }
1162 | EncodeJson { .. }
1163 | DecodeJson { .. }
1164 | PayloadNotExist { .. }
1165 | ConvertRawKey { .. }
1166 | DecodeProto { .. }
1167 | BuildTableMeta { .. }
1168 | TableRouteNotFound { .. }
1169 | TableRepartNotFound { .. }
1170 | RegionOperatingRace { .. }
1171 | EncodeWalOptions { .. }
1172 | BuildKafkaClient { .. }
1173 | BuildKafkaCtrlClient { .. }
1174 | KafkaPartitionClient { .. }
1175 | ProduceRecord { .. }
1176 | CreateKafkaWalTopic { .. }
1177 | EmptyTopicPool { .. }
1178 | UnexpectedLogicalRouteTable { .. }
1179 | ProcedureOutput { .. }
1180 | FromUtf8 { .. }
1181 | MetadataCorruption { .. }
1182 | ParseWalOptions { .. }
1183 | KafkaGetOffset { .. }
1184 | ReadFlexbuffers { .. }
1185 | SerializeFlexbuffers { .. }
1186 | DeserializeFlexbuffers { .. }
1187 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1188
1189 GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1190
1191 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1192
1193 ProcedureNotFound { .. }
1194 | InvalidViewInfo { .. }
1195 | PrimaryKeyNotFound { .. }
1196 | EmptyKey { .. }
1197 | AlterLogicalTablesInvalidArguments { .. }
1198 | CreateLogicalTablesInvalidArguments { .. }
1199 | MismatchPrefix { .. }
1200 | TlsConfig { .. }
1201 | InvalidSetDatabaseOption { .. }
1202 | InvalidUnsetDatabaseOption { .. }
1203 | InvalidTopicNamePrefix { .. }
1204 | InvalidFileExtension { .. }
1205 | InvalidFileName { .. }
1206 | InvalidFlowRequestBody { .. }
1207 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1208
1209 #[cfg(feature = "enterprise")]
1210 MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1211 StatusCode::InvalidArguments
1212 }
1213
1214 FlowNotFound { .. } => StatusCode::FlowNotFound,
1215 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1216 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1217
1218 ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1219 StatusCode::TableNotFound
1220 }
1221 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1222
1223 SubmitProcedure { source, .. }
1224 | QueryProcedure { source, .. }
1225 | WaitProcedure { source, .. }
1226 | StartProcedureManager { source, .. }
1227 | StopProcedureManager { source, .. } => source.status_code(),
1228 RegisterProcedureLoader { source, .. } => source.status_code(),
1229 External { source, .. } => source.status_code(),
1230 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1231 OperateDatanode { source, .. } => source.status_code(),
1232 Table { source, .. } => source.status_code(),
1233 RetryLater { source, .. } => source.status_code(),
1234 AbortProcedure { source, .. } => source.status_code(),
1235 ConvertAlterTableRequest { source, .. } => source.status_code(),
1236 PutPoison { source, .. } => source.status_code(),
1237 ConvertColumnDef { source, .. } => source.status_code(),
1238 ProcedureStateReceiver { source, .. } => source.status_code(),
1239 RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1240 CreateRepartitionProcedure { source, .. } => source.status_code(),
1241
1242 ParseProcedureId { .. }
1243 | InvalidNumTopics { .. }
1244 | SchemaNotFound { .. }
1245 | CatalogNotFound { .. }
1246 | InvalidNodeInfoKey { .. }
1247 | InvalidStatKey { .. }
1248 | ParseNum { .. }
1249 | InvalidRole { .. }
1250 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1251
1252 LoadTlsCertificate { .. } => StatusCode::Internal,
1253
1254 #[cfg(feature = "pg_kvbackend")]
1255 PostgresExecution { .. }
1256 | CreatePostgresPool { .. }
1257 | GetPostgresConnection { .. }
1258 | GetPostgresClient { .. }
1259 | PostgresTransaction { .. }
1260 | PostgresTlsConfig { .. }
1261 | InvalidTlsConfig { .. } => StatusCode::Internal,
1262 #[cfg(feature = "mysql_kvbackend")]
1263 MySqlExecution { .. }
1264 | CreateMySqlPool { .. }
1265 | DecodeSqlValue { .. }
1266 | AcquireMySqlClient { .. }
1267 | MySqlTransaction { .. } => StatusCode::Internal,
1268 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1269 RdsTransactionRetryFailed { .. } | SqlExecutionTimeout { .. } => StatusCode::Internal,
1270 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1271 }
1272 }
1273
1274 fn as_any(&self) -> &dyn std::any::Any {
1275 self
1276 }
1277}
1278
1279impl Error {
1280 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1281 pub fn is_serialization_error(&self) -> bool {
1283 match self {
1284 #[cfg(feature = "pg_kvbackend")]
1285 Error::PostgresTransaction { error, .. } => {
1286 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1287 }
1288 #[cfg(feature = "pg_kvbackend")]
1289 Error::PostgresExecution { error, .. } => {
1290 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1291 }
1292 #[cfg(feature = "mysql_kvbackend")]
1293 Error::MySqlExecution {
1294 error: sqlx::Error::Database(database_error),
1295 ..
1296 } => {
1297 matches!(
1298 database_error.message(),
1299 "Deadlock found when trying to get lock; try restarting transaction"
1300 | "can't serialize access for this transaction"
1301 )
1302 }
1303 _ => false,
1304 }
1305 }
1306
1307 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1309 Error::RetryLater {
1310 source: BoxedError::new(err),
1311 clean_poisons: false,
1312 }
1313 }
1314
1315 pub fn is_retry_later(&self) -> bool {
1317 matches!(
1318 self,
1319 Error::RetryLater { .. } | Error::GetLatestCacheRetryExceeded { .. }
1320 )
1321 }
1322
1323 pub fn need_clean_poisons(&self) -> bool {
1325 matches!(
1326 self,
1327 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1328 ) || matches!(
1329 self,
1330 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1331 )
1332 }
1333
1334 pub fn is_exceeded_size_limit(&self) -> bool {
1336 match self {
1337 Error::EtcdFailed {
1338 error: etcd_client::Error::GRpcStatus(status),
1339 ..
1340 } => status.code() == tonic::Code::OutOfRange,
1341 Error::ResponseExceededSizeLimit { .. } => true,
1342 _ => false,
1343 }
1344 }
1345}