1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to submit procedure"))]
108 SubmitProcedure {
109 #[snafu(implicit)]
110 location: Location,
111 source: common_procedure::Error,
112 },
113
114 #[snafu(display("Failed to query procedure"))]
115 QueryProcedure {
116 #[snafu(implicit)]
117 location: Location,
118 source: common_procedure::Error,
119 },
120
121 #[snafu(display("Procedure not found: {pid}"))]
122 ProcedureNotFound {
123 #[snafu(implicit)]
124 location: Location,
125 pid: String,
126 },
127
128 #[snafu(display("Failed to parse procedure id: {key}"))]
129 ParseProcedureId {
130 #[snafu(implicit)]
131 location: Location,
132 key: String,
133 #[snafu(source)]
134 error: common_procedure::ParseIdError,
135 },
136
137 #[snafu(display("Unsupported operation {}", operation))]
138 Unsupported {
139 operation: String,
140 #[snafu(implicit)]
141 location: Location,
142 },
143
144 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
145 ProcedureStateReceiver {
146 procedure_id: ProcedureId,
147 #[snafu(implicit)]
148 location: Location,
149 source: common_procedure::Error,
150 },
151
152 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
153 ProcedureStateReceiverNotFound {
154 procedure_id: ProcedureId,
155 #[snafu(implicit)]
156 location: Location,
157 },
158
159 #[snafu(display("Failed to wait procedure done"))]
160 WaitProcedure {
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Failed to start procedure manager"))]
167 StartProcedureManager {
168 #[snafu(implicit)]
169 location: Location,
170 source: common_procedure::Error,
171 },
172
173 #[snafu(display("Failed to stop procedure manager"))]
174 StopProcedureManager {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display(
181 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
182 ))]
183 ProcedureOutput {
184 procedure_id: String,
185 err_msg: String,
186 #[snafu(implicit)]
187 location: Location,
188 },
189
190 #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
191 ConvertRawTableInfo {
192 #[snafu(implicit)]
193 location: Location,
194 source: datatypes::Error,
195 },
196
197 #[snafu(display("Primary key '{key}' not found when creating region request"))]
198 PrimaryKeyNotFound {
199 key: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Failed to build table meta for table: {}", table_name))]
205 BuildTableMeta {
206 table_name: String,
207 #[snafu(source)]
208 error: table::metadata::TableMetaBuilderError,
209 #[snafu(implicit)]
210 location: Location,
211 },
212
213 #[snafu(display("Table occurs error"))]
214 Table {
215 #[snafu(implicit)]
216 location: Location,
217 source: table::error::Error,
218 },
219
220 #[snafu(display("Failed to find table route for table id {}", table_id))]
221 TableRouteNotFound {
222 table_id: TableId,
223 #[snafu(implicit)]
224 location: Location,
225 },
226
227 #[snafu(display("Failed to decode protobuf"))]
228 DecodeProto {
229 #[snafu(implicit)]
230 location: Location,
231 #[snafu(source)]
232 error: prost::DecodeError,
233 },
234
235 #[snafu(display("Failed to encode object into json"))]
236 EncodeJson {
237 #[snafu(implicit)]
238 location: Location,
239 #[snafu(source)]
240 error: JsonError,
241 },
242
243 #[snafu(display("Failed to decode object from json"))]
244 DecodeJson {
245 #[snafu(implicit)]
246 location: Location,
247 #[snafu(source)]
248 error: JsonError,
249 },
250
251 #[snafu(display("Failed to serialize to json: {}", input))]
252 SerializeToJson {
253 input: String,
254 #[snafu(source)]
255 error: serde_json::error::Error,
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to deserialize from json: {}", input))]
261 DeserializeFromJson {
262 input: String,
263 #[snafu(source)]
264 error: serde_json::error::Error,
265 #[snafu(implicit)]
266 location: Location,
267 },
268
269 #[snafu(display("Payload not exist"))]
270 PayloadNotExist {
271 #[snafu(implicit)]
272 location: Location,
273 },
274
275 #[snafu(display("Failed to serde json"))]
276 SerdeJson {
277 #[snafu(source)]
278 error: serde_json::error::Error,
279 #[snafu(implicit)]
280 location: Location,
281 },
282
283 #[snafu(display("Failed to parse value {} into key {}", value, key))]
284 ParseOption {
285 key: String,
286 value: String,
287 #[snafu(implicit)]
288 location: Location,
289 },
290
291 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
292 RouteInfoCorrupted {
293 err_msg: String,
294 #[snafu(implicit)]
295 location: Location,
296 },
297
298 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
299 IllegalServerState {
300 code: i32,
301 err_msg: String,
302 #[snafu(implicit)]
303 location: Location,
304 },
305
306 #[snafu(display("Failed to convert alter table request"))]
307 ConvertAlterTableRequest {
308 source: common_grpc_expr::error::Error,
309 #[snafu(implicit)]
310 location: Location,
311 },
312
313 #[snafu(display("Invalid protobuf message: {err_msg}"))]
314 InvalidProtoMsg {
315 err_msg: String,
316 #[snafu(implicit)]
317 location: Location,
318 },
319
320 #[snafu(display("Unexpected: {err_msg}"))]
321 Unexpected {
322 err_msg: String,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Table already exists, table: {}", table_name))]
328 TableAlreadyExists {
329 table_name: String,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("View already exists, view: {}", view_name))]
335 ViewAlreadyExists {
336 view_name: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("Flow already exists: {}", flow_name))]
342 FlowAlreadyExists {
343 flow_name: String,
344 #[snafu(implicit)]
345 location: Location,
346 },
347
348 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
349 SchemaAlreadyExists {
350 catalog: String,
351 schema: String,
352 #[snafu(implicit)]
353 location: Location,
354 },
355
356 #[snafu(display("Failed to convert raw key to str"))]
357 ConvertRawKey {
358 #[snafu(implicit)]
359 location: Location,
360 #[snafu(source)]
361 error: Utf8Error,
362 },
363
364 #[snafu(display("Table not found: '{}'", table_name))]
365 TableNotFound {
366 table_name: String,
367 #[snafu(implicit)]
368 location: Location,
369 },
370
371 #[snafu(display("Region not found: {}", region_id))]
372 RegionNotFound {
373 region_id: RegionId,
374 #[snafu(implicit)]
375 location: Location,
376 },
377
378 #[snafu(display("View not found: '{}'", view_name))]
379 ViewNotFound {
380 view_name: String,
381 #[snafu(implicit)]
382 location: Location,
383 },
384
385 #[snafu(display("Flow not found: '{}'", flow_name))]
386 FlowNotFound {
387 flow_name: String,
388 #[snafu(implicit)]
389 location: Location,
390 },
391
392 #[snafu(display("Flow route not found: '{}'", flow_name))]
393 FlowRouteNotFound {
394 flow_name: String,
395 #[snafu(implicit)]
396 location: Location,
397 },
398
399 #[snafu(display("Schema nod found, schema: {}", table_schema))]
400 SchemaNotFound {
401 table_schema: String,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Catalog not found, catalog: {}", catalog))]
407 CatalogNotFound {
408 catalog: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Invalid metadata, err: {}", err_msg))]
414 InvalidMetadata {
415 err_msg: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[snafu(display("Invalid view info, err: {}", err_msg))]
421 InvalidViewInfo {
422 err_msg: String,
423 #[snafu(implicit)]
424 location: Location,
425 },
426
427 #[snafu(display("Invalid flow request body: {:?}", body))]
428 InvalidFlowRequestBody {
429 body: Box<Option<api::v1::flow::flow_request::Body>>,
430 #[snafu(implicit)]
431 location: Location,
432 },
433
434 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
435 GetKvCache { err_msg: String },
436
437 #[snafu(display("Get null from cache, key: {}", key))]
438 CacheNotGet {
439 key: String,
440 #[snafu(implicit)]
441 location: Location,
442 },
443
444 #[snafu(display("Etcd txn error: {err_msg}"))]
445 EtcdTxnOpResponse {
446 err_msg: String,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("External error"))]
452 External {
453 #[snafu(implicit)]
454 location: Location,
455 source: BoxedError,
456 },
457
458 #[snafu(display("The response exceeded size limit"))]
459 ResponseExceededSizeLimit {
460 #[snafu(implicit)]
461 location: Location,
462 source: BoxedError,
463 },
464
465 #[snafu(display("Invalid heartbeat response"))]
466 InvalidHeartbeatResponse {
467 #[snafu(implicit)]
468 location: Location,
469 },
470
471 #[snafu(display("Failed to operate on datanode: {}", peer))]
472 OperateDatanode {
473 #[snafu(implicit)]
474 location: Location,
475 peer: Peer,
476 source: BoxedError,
477 },
478
479 #[snafu(display("Retry later"))]
480 RetryLater {
481 source: BoxedError,
482 clean_poisons: bool,
483 },
484
485 #[snafu(display("Abort procedure"))]
486 AbortProcedure {
487 #[snafu(implicit)]
488 location: Location,
489 source: BoxedError,
490 clean_poisons: bool,
491 },
492
493 #[snafu(display(
494 "Failed to encode a wal options to json string, wal_options: {:?}",
495 wal_options
496 ))]
497 EncodeWalOptions {
498 wal_options: WalOptions,
499 #[snafu(source)]
500 error: serde_json::Error,
501 #[snafu(implicit)]
502 location: Location,
503 },
504
505 #[snafu(display("Invalid number of topics {}", num_topics))]
506 InvalidNumTopics {
507 num_topics: usize,
508 #[snafu(implicit)]
509 location: Location,
510 },
511
512 #[snafu(display(
513 "Failed to build a Kafka client, broker endpoints: {:?}",
514 broker_endpoints
515 ))]
516 BuildKafkaClient {
517 broker_endpoints: Vec<String>,
518 #[snafu(implicit)]
519 location: Location,
520 #[snafu(source)]
521 error: rskafka::client::error::Error,
522 },
523
524 #[snafu(display("Failed to create TLS Config"))]
525 TlsConfig {
526 #[snafu(implicit)]
527 location: Location,
528 source: common_wal::error::Error,
529 },
530
531 #[snafu(display("Failed to build a Kafka controller client"))]
532 BuildKafkaCtrlClient {
533 #[snafu(implicit)]
534 location: Location,
535 #[snafu(source)]
536 error: rskafka::client::error::Error,
537 },
538
539 #[snafu(display(
540 "Failed to get a Kafka partition client, topic: {}, partition: {}",
541 topic,
542 partition
543 ))]
544 KafkaPartitionClient {
545 topic: String,
546 partition: i32,
547 #[snafu(implicit)]
548 location: Location,
549 #[snafu(source)]
550 error: rskafka::client::error::Error,
551 },
552
553 #[snafu(display(
554 "Failed to get offset from Kafka, topic: {}, partition: {}",
555 topic,
556 partition
557 ))]
558 KafkaGetOffset {
559 topic: String,
560 partition: i32,
561 #[snafu(implicit)]
562 location: Location,
563 #[snafu(source)]
564 error: rskafka::client::error::Error,
565 },
566
567 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
568 ProduceRecord {
569 topic: String,
570 #[snafu(implicit)]
571 location: Location,
572 #[snafu(source)]
573 error: rskafka::client::error::Error,
574 },
575
576 #[snafu(display("Failed to create a Kafka wal topic"))]
577 CreateKafkaWalTopic {
578 #[snafu(implicit)]
579 location: Location,
580 #[snafu(source)]
581 error: rskafka::client::error::Error,
582 },
583
584 #[snafu(display("The topic pool is empty"))]
585 EmptyTopicPool {
586 #[snafu(implicit)]
587 location: Location,
588 },
589
590 #[snafu(display("Unexpected table route type: {}", err_msg))]
591 UnexpectedLogicalRouteTable {
592 #[snafu(implicit)]
593 location: Location,
594 err_msg: String,
595 },
596
597 #[snafu(display("The tasks of {} cannot be empty", name))]
598 EmptyDdlTasks {
599 name: String,
600 #[snafu(implicit)]
601 location: Location,
602 },
603
604 #[snafu(display("Metadata corruption: {}", err_msg))]
605 MetadataCorruption {
606 err_msg: String,
607 #[snafu(implicit)]
608 location: Location,
609 },
610
611 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
612 AlterLogicalTablesInvalidArguments {
613 err_msg: String,
614 #[snafu(implicit)]
615 location: Location,
616 },
617
618 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
619 CreateLogicalTablesInvalidArguments {
620 err_msg: String,
621 #[snafu(implicit)]
622 location: Location,
623 },
624
625 #[snafu(display("Invalid node info key: {}", key))]
626 InvalidNodeInfoKey {
627 key: String,
628 #[snafu(implicit)]
629 location: Location,
630 },
631
632 #[snafu(display("Invalid node stat key: {}", key))]
633 InvalidStatKey {
634 key: String,
635 #[snafu(implicit)]
636 location: Location,
637 },
638
639 #[snafu(display("Failed to parse number: {}", err_msg))]
640 ParseNum {
641 err_msg: String,
642 #[snafu(source)]
643 error: std::num::ParseIntError,
644 #[snafu(implicit)]
645 location: Location,
646 },
647
648 #[snafu(display("Invalid role: {}", role))]
649 InvalidRole {
650 role: i32,
651 #[snafu(implicit)]
652 location: Location,
653 },
654
655 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
656 InvalidSetDatabaseOption {
657 key: String,
658 value: String,
659 #[snafu(implicit)]
660 location: Location,
661 },
662
663 #[snafu(display("Invalid unset database option, key: {}", key))]
664 InvalidUnsetDatabaseOption {
665 key: String,
666 #[snafu(implicit)]
667 location: Location,
668 },
669
670 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
671 MismatchPrefix {
672 prefix: String,
673 key: String,
674 #[snafu(implicit)]
675 location: Location,
676 },
677
678 #[snafu(display("Failed to move values: {err_msg}"))]
679 MoveValues {
680 err_msg: String,
681 #[snafu(implicit)]
682 location: Location,
683 },
684
685 #[snafu(display("Failed to parse {} from utf8", name))]
686 FromUtf8 {
687 name: String,
688 #[snafu(source)]
689 error: std::string::FromUtf8Error,
690 #[snafu(implicit)]
691 location: Location,
692 },
693
694 #[snafu(display("Value not exists"))]
695 ValueNotExist {
696 #[snafu(implicit)]
697 location: Location,
698 },
699
700 #[snafu(display("Failed to get cache"))]
701 GetCache { source: Arc<Error> },
702
703 #[cfg(feature = "pg_kvbackend")]
704 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
705 PostgresExecution {
706 sql: String,
707 #[snafu(source)]
708 error: tokio_postgres::Error,
709 #[snafu(implicit)]
710 location: Location,
711 },
712
713 #[cfg(feature = "pg_kvbackend")]
714 #[snafu(display("Failed to create connection pool for Postgres"))]
715 CreatePostgresPool {
716 #[snafu(source)]
717 error: deadpool_postgres::CreatePoolError,
718 #[snafu(implicit)]
719 location: Location,
720 },
721
722 #[cfg(feature = "pg_kvbackend")]
723 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
724 GetPostgresConnection {
725 reason: String,
726 #[snafu(implicit)]
727 location: Location,
728 },
729
730 #[cfg(feature = "pg_kvbackend")]
731 #[snafu(display("Failed to {} Postgres transaction", operation))]
732 PostgresTransaction {
733 #[snafu(source)]
734 error: tokio_postgres::Error,
735 #[snafu(implicit)]
736 location: Location,
737 operation: String,
738 },
739
740 #[cfg(feature = "pg_kvbackend")]
741 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
742 PostgresTlsConfig {
743 reason: String,
744 #[snafu(implicit)]
745 location: Location,
746 },
747
748 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
749 LoadTlsCertificate {
750 path: String,
751 #[snafu(source)]
752 error: std::io::Error,
753 #[snafu(implicit)]
754 location: Location,
755 },
756
757 #[cfg(feature = "pg_kvbackend")]
758 #[snafu(display("Invalid TLS configuration: {}", reason))]
759 InvalidTlsConfig {
760 reason: String,
761 #[snafu(implicit)]
762 location: Location,
763 },
764
765 #[cfg(feature = "mysql_kvbackend")]
766 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
767 MySqlExecution {
768 sql: String,
769 #[snafu(source)]
770 error: sqlx::Error,
771 #[snafu(implicit)]
772 location: Location,
773 },
774
775 #[cfg(feature = "mysql_kvbackend")]
776 #[snafu(display("Failed to create connection pool for MySql"))]
777 CreateMySqlPool {
778 #[snafu(source)]
779 error: sqlx::Error,
780 #[snafu(implicit)]
781 location: Location,
782 },
783
784 #[cfg(feature = "mysql_kvbackend")]
785 #[snafu(display("Failed to {} MySql transaction", operation))]
786 MySqlTransaction {
787 #[snafu(source)]
788 error: sqlx::Error,
789 #[snafu(implicit)]
790 location: Location,
791 operation: String,
792 },
793
794 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
795 #[snafu(display("Rds transaction retry failed"))]
796 RdsTransactionRetryFailed {
797 #[snafu(implicit)]
798 location: Location,
799 },
800
801 #[snafu(display(
802 "Datanode table info not found, table id: {}, datanode id: {}",
803 table_id,
804 datanode_id
805 ))]
806 DatanodeTableInfoNotFound {
807 datanode_id: DatanodeId,
808 table_id: TableId,
809 #[snafu(implicit)]
810 location: Location,
811 },
812
813 #[snafu(display("Invalid topic name prefix: {}", prefix))]
814 InvalidTopicNamePrefix {
815 prefix: String,
816 #[snafu(implicit)]
817 location: Location,
818 },
819
820 #[snafu(display("Failed to parse wal options: {}", wal_options))]
821 ParseWalOptions {
822 wal_options: String,
823 #[snafu(implicit)]
824 location: Location,
825 #[snafu(source)]
826 error: serde_json::Error,
827 },
828
829 #[snafu(display("No leader found for table_id: {}", table_id))]
830 NoLeader {
831 table_id: TableId,
832 #[snafu(implicit)]
833 location: Location,
834 },
835
836 #[snafu(display(
837 "Procedure poison key already exists with a different value, key: {}, value: {}",
838 key,
839 value
840 ))]
841 ProcedurePoisonConflict {
842 key: String,
843 value: String,
844 #[snafu(implicit)]
845 location: Location,
846 },
847
848 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
849 PutPoison {
850 #[snafu(implicit)]
851 location: Location,
852 #[snafu(source)]
853 source: common_procedure::error::Error,
854 },
855
856 #[snafu(display("Failed to parse timezone"))]
857 InvalidTimeZone {
858 #[snafu(implicit)]
859 location: Location,
860 #[snafu(source)]
861 error: common_time::error::Error,
862 },
863 #[snafu(display("Invalid file path: {}", file_path))]
864 InvalidFilePath {
865 #[snafu(implicit)]
866 location: Location,
867 file_path: String,
868 },
869
870 #[snafu(display("Failed to serialize flexbuffers"))]
871 SerializeFlexbuffers {
872 #[snafu(implicit)]
873 location: Location,
874 #[snafu(source)]
875 error: flexbuffers::SerializationError,
876 },
877
878 #[snafu(display("Failed to deserialize flexbuffers"))]
879 DeserializeFlexbuffers {
880 #[snafu(implicit)]
881 location: Location,
882 #[snafu(source)]
883 error: flexbuffers::DeserializationError,
884 },
885
886 #[snafu(display("Failed to read flexbuffers"))]
887 ReadFlexbuffers {
888 #[snafu(implicit)]
889 location: Location,
890 #[snafu(source)]
891 error: flexbuffers::ReaderError,
892 },
893
894 #[snafu(display("Invalid file name: {}", reason))]
895 InvalidFileName {
896 #[snafu(implicit)]
897 location: Location,
898 reason: String,
899 },
900
901 #[snafu(display("Invalid file extension: {}", reason))]
902 InvalidFileExtension {
903 #[snafu(implicit)]
904 location: Location,
905 reason: String,
906 },
907
908 #[snafu(display("Failed to write object, file path: {}", file_path))]
909 WriteObject {
910 #[snafu(implicit)]
911 location: Location,
912 file_path: String,
913 #[snafu(source)]
914 error: object_store::Error,
915 },
916
917 #[snafu(display("Failed to read object, file path: {}", file_path))]
918 ReadObject {
919 #[snafu(implicit)]
920 location: Location,
921 file_path: String,
922 #[snafu(source)]
923 error: object_store::Error,
924 },
925
926 #[snafu(display("Missing column ids"))]
927 MissingColumnIds {
928 #[snafu(implicit)]
929 location: Location,
930 },
931
932 #[snafu(display(
933 "Missing column in column metadata: {}, table: {}, table_id: {}",
934 column_name,
935 table_name,
936 table_id,
937 ))]
938 MissingColumnInColumnMetadata {
939 column_name: String,
940 #[snafu(implicit)]
941 location: Location,
942 table_name: String,
943 table_id: TableId,
944 },
945
946 #[snafu(display(
947 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
948 column_name,
949 column_id,
950 table_name,
951 table_id,
952 ))]
953 MismatchColumnId {
954 column_name: String,
955 column_id: u32,
956 #[snafu(implicit)]
957 location: Location,
958 table_name: String,
959 table_id: TableId,
960 },
961
962 #[snafu(display("Failed to convert column def, column: {}", column))]
963 ConvertColumnDef {
964 column: String,
965 #[snafu(implicit)]
966 location: Location,
967 source: api::error::Error,
968 },
969
970 #[snafu(display("Failed to convert time ranges"))]
971 ConvertTimeRanges {
972 #[snafu(implicit)]
973 location: Location,
974 source: api::error::Error,
975 },
976
977 #[snafu(display(
978 "Column metadata inconsistencies found in table: {}, table_id: {}",
979 table_name,
980 table_id
981 ))]
982 ColumnMetadataConflicts {
983 table_name: String,
984 table_id: TableId,
985 },
986
987 #[snafu(display(
988 "Column not found in column metadata, column_name: {}, column_id: {}",
989 column_name,
990 column_id
991 ))]
992 ColumnNotFound { column_name: String, column_id: u32 },
993
994 #[snafu(display(
995 "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
996 column_name,
997 expected_column_id,
998 actual_column_id
999 ))]
1000 ColumnIdMismatch {
1001 column_name: String,
1002 expected_column_id: u32,
1003 actual_column_id: u32,
1004 },
1005
1006 #[snafu(display(
1007 "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1008 expected_column_name,
1009 expected_column_id,
1010 actual_column_name,
1011 actual_column_id,
1012 ))]
1013 TimestampMismatch {
1014 expected_column_name: String,
1015 expected_column_id: u32,
1016 actual_column_name: String,
1017 actual_column_id: u32,
1018 },
1019
1020 #[cfg(feature = "enterprise")]
1021 #[snafu(display("Too large duration"))]
1022 TooLargeDuration {
1023 #[snafu(source)]
1024 error: prost_types::DurationError,
1025 #[snafu(implicit)]
1026 location: Location,
1027 },
1028
1029 #[cfg(feature = "enterprise")]
1030 #[snafu(display("Negative duration"))]
1031 NegativeDuration {
1032 #[snafu(source)]
1033 error: prost_types::DurationError,
1034 #[snafu(implicit)]
1035 location: Location,
1036 },
1037
1038 #[cfg(feature = "enterprise")]
1039 #[snafu(display("Missing interval field"))]
1040 MissingInterval {
1041 #[snafu(implicit)]
1042 location: Location,
1043 },
1044}
1045
1046pub type Result<T> = std::result::Result<T, Error>;
1047
1048impl ErrorExt for Error {
1049 fn status_code(&self) -> StatusCode {
1050 use Error::*;
1051 match self {
1052 IllegalServerState { .. }
1053 | EtcdTxnOpResponse { .. }
1054 | EtcdFailed { .. }
1055 | EtcdTxnFailed { .. }
1056 | ConnectEtcd { .. }
1057 | MoveValues { .. }
1058 | GetCache { .. }
1059 | SerializeToJson { .. }
1060 | DeserializeFromJson { .. } => StatusCode::Internal,
1061
1062 NoLeader { .. } => StatusCode::TableUnavailable,
1063 ValueNotExist { .. }
1064 | ProcedurePoisonConflict { .. }
1065 | ProcedureStateReceiverNotFound { .. }
1066 | MissingColumnIds { .. }
1067 | MissingColumnInColumnMetadata { .. }
1068 | MismatchColumnId { .. }
1069 | ColumnMetadataConflicts { .. }
1070 | ColumnNotFound { .. }
1071 | ColumnIdMismatch { .. }
1072 | TimestampMismatch { .. } => StatusCode::Unexpected,
1073
1074 Unsupported { .. } => StatusCode::Unsupported,
1075 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1076
1077 SerdeJson { .. }
1078 | ParseOption { .. }
1079 | RouteInfoCorrupted { .. }
1080 | InvalidProtoMsg { .. }
1081 | InvalidMetadata { .. }
1082 | Unexpected { .. }
1083 | TableInfoNotFound { .. }
1084 | NextSequence { .. }
1085 | UnexpectedSequenceValue { .. }
1086 | InvalidHeartbeatResponse { .. }
1087 | EncodeJson { .. }
1088 | DecodeJson { .. }
1089 | PayloadNotExist { .. }
1090 | ConvertRawKey { .. }
1091 | DecodeProto { .. }
1092 | BuildTableMeta { .. }
1093 | TableRouteNotFound { .. }
1094 | ConvertRawTableInfo { .. }
1095 | RegionOperatingRace { .. }
1096 | EncodeWalOptions { .. }
1097 | BuildKafkaClient { .. }
1098 | BuildKafkaCtrlClient { .. }
1099 | KafkaPartitionClient { .. }
1100 | ProduceRecord { .. }
1101 | CreateKafkaWalTopic { .. }
1102 | EmptyTopicPool { .. }
1103 | UnexpectedLogicalRouteTable { .. }
1104 | ProcedureOutput { .. }
1105 | FromUtf8 { .. }
1106 | MetadataCorruption { .. }
1107 | ParseWalOptions { .. }
1108 | KafkaGetOffset { .. }
1109 | ReadFlexbuffers { .. }
1110 | SerializeFlexbuffers { .. }
1111 | DeserializeFlexbuffers { .. }
1112 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1113
1114 GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1115
1116 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1117
1118 ProcedureNotFound { .. }
1119 | InvalidViewInfo { .. }
1120 | PrimaryKeyNotFound { .. }
1121 | EmptyKey { .. }
1122 | AlterLogicalTablesInvalidArguments { .. }
1123 | CreateLogicalTablesInvalidArguments { .. }
1124 | MismatchPrefix { .. }
1125 | TlsConfig { .. }
1126 | InvalidSetDatabaseOption { .. }
1127 | InvalidUnsetDatabaseOption { .. }
1128 | InvalidTopicNamePrefix { .. }
1129 | InvalidTimeZone { .. }
1130 | InvalidFileExtension { .. }
1131 | InvalidFileName { .. }
1132 | InvalidFlowRequestBody { .. }
1133 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1134
1135 #[cfg(feature = "enterprise")]
1136 MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1137 StatusCode::InvalidArguments
1138 }
1139
1140 FlowNotFound { .. } => StatusCode::FlowNotFound,
1141 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1142 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1143
1144 ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1145 StatusCode::TableNotFound
1146 }
1147 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1148
1149 SubmitProcedure { source, .. }
1150 | QueryProcedure { source, .. }
1151 | WaitProcedure { source, .. }
1152 | StartProcedureManager { source, .. }
1153 | StopProcedureManager { source, .. } => source.status_code(),
1154 RegisterProcedureLoader { source, .. } => source.status_code(),
1155 External { source, .. } => source.status_code(),
1156 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1157 OperateDatanode { source, .. } => source.status_code(),
1158 Table { source, .. } => source.status_code(),
1159 RetryLater { source, .. } => source.status_code(),
1160 AbortProcedure { source, .. } => source.status_code(),
1161 ConvertAlterTableRequest { source, .. } => source.status_code(),
1162 PutPoison { source, .. } => source.status_code(),
1163 ConvertColumnDef { source, .. } => source.status_code(),
1164 ProcedureStateReceiver { source, .. } => source.status_code(),
1165
1166 ParseProcedureId { .. }
1167 | InvalidNumTopics { .. }
1168 | SchemaNotFound { .. }
1169 | CatalogNotFound { .. }
1170 | InvalidNodeInfoKey { .. }
1171 | InvalidStatKey { .. }
1172 | ParseNum { .. }
1173 | InvalidRole { .. }
1174 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1175
1176 LoadTlsCertificate { .. } => StatusCode::Internal,
1177
1178 #[cfg(feature = "pg_kvbackend")]
1179 PostgresExecution { .. }
1180 | CreatePostgresPool { .. }
1181 | GetPostgresConnection { .. }
1182 | PostgresTransaction { .. }
1183 | PostgresTlsConfig { .. }
1184 | InvalidTlsConfig { .. } => StatusCode::Internal,
1185 #[cfg(feature = "mysql_kvbackend")]
1186 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1187 StatusCode::Internal
1188 }
1189 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1190 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1191 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1192 }
1193 }
1194
1195 fn as_any(&self) -> &dyn std::any::Any {
1196 self
1197 }
1198}
1199
1200impl Error {
1201 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1202 pub fn is_serialization_error(&self) -> bool {
1204 match self {
1205 #[cfg(feature = "pg_kvbackend")]
1206 Error::PostgresTransaction { error, .. } => {
1207 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1208 }
1209 #[cfg(feature = "pg_kvbackend")]
1210 Error::PostgresExecution { error, .. } => {
1211 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1212 }
1213 #[cfg(feature = "mysql_kvbackend")]
1214 Error::MySqlExecution {
1215 error: sqlx::Error::Database(database_error),
1216 ..
1217 } => {
1218 matches!(
1219 database_error.message(),
1220 "Deadlock found when trying to get lock; try restarting transaction"
1221 | "can't serialize access for this transaction"
1222 )
1223 }
1224 _ => false,
1225 }
1226 }
1227
1228 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1230 Error::RetryLater {
1231 source: BoxedError::new(err),
1232 clean_poisons: false,
1233 }
1234 }
1235
1236 pub fn is_retry_later(&self) -> bool {
1238 matches!(self, Error::RetryLater { .. })
1239 }
1240
1241 pub fn need_clean_poisons(&self) -> bool {
1243 matches!(
1244 self,
1245 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1246 ) || matches!(
1247 self,
1248 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1249 )
1250 }
1251
1252 pub fn is_exceeded_size_limit(&self) -> bool {
1254 match self {
1255 Error::EtcdFailed {
1256 error: etcd_client::Error::GRpcStatus(status),
1257 ..
1258 } => status.code() == tonic::Code::OutOfRange,
1259 Error::ResponseExceededSizeLimit { .. } => true,
1260 _ => false,
1261 }
1262 }
1263}