1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt, RetryHint};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use common_wal::kafka::rskafka_client_error_to_retry_hint;
23use snafu::{Location, Snafu};
24use store_api::storage::RegionId;
25use table::metadata::TableId;
26use tokio::sync::mpsc::error::SendError;
27use tonic::codegen::http;
28use uuid::Uuid;
29
30use crate::metasrv::SelectTarget;
31use crate::pubsub::Message;
32use crate::service::mailbox::Channel;
33
34#[derive(Snafu)]
35#[snafu(visibility(pub))]
36#[stack_trace_debug]
37pub enum Error {
38 #[snafu(display("Failed to choose items"))]
39 ChooseItems {
40 #[snafu(implicit)]
41 location: Location,
42 #[snafu(source)]
43 error: rand::distr::weighted::Error,
44 },
45
46 #[snafu(display("Exceeded deadline, operation: {}", operation))]
47 ExceededDeadline {
48 #[snafu(implicit)]
49 location: Location,
50 operation: String,
51 },
52
53 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
54 PeerUnavailable {
55 #[snafu(implicit)]
56 location: Location,
57 peer_id: u64,
58 },
59
60 #[snafu(display("Failed to list active frontends"))]
61 ListActiveFrontends {
62 #[snafu(implicit)]
63 location: Location,
64 source: common_meta::error::Error,
65 },
66
67 #[snafu(display("Failed to list active datanodes"))]
68 ListActiveDatanodes {
69 #[snafu(implicit)]
70 location: Location,
71 source: common_meta::error::Error,
72 },
73
74 #[snafu(display("Failed to list active flownodes"))]
75 ListActiveFlownodes {
76 #[snafu(implicit)]
77 location: Location,
78 source: common_meta::error::Error,
79 },
80
81 #[snafu(display("No available frontend"))]
82 NoAvailableFrontend {
83 #[snafu(implicit)]
84 location: Location,
85 },
86
87 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
88 MigrationRunning {
89 #[snafu(implicit)]
90 location: Location,
91 region_id: RegionId,
92 },
93
94 #[snafu(display(
95 "The region migration procedure is completed for region: {}, target_peer: {}",
96 region_id,
97 target_peer_id
98 ))]
99 RegionMigrated {
100 #[snafu(implicit)]
101 location: Location,
102 region_id: RegionId,
103 target_peer_id: u64,
104 },
105
106 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
107 MigrationAbort {
108 #[snafu(implicit)]
109 location: Location,
110 reason: String,
111 },
112
113 #[snafu(display(
114 "Another procedure is operating the region: {} on peer: {}",
115 region_id,
116 peer_id
117 ))]
118 RegionOperatingRace {
119 #[snafu(implicit)]
120 location: Location,
121 peer_id: DatanodeId,
122 region_id: RegionId,
123 },
124
125 #[snafu(display("Failed to init ddl manager"))]
126 InitDdlManager {
127 #[snafu(implicit)]
128 location: Location,
129 source: common_meta::error::Error,
130 },
131
132 #[snafu(display("Failed to init reconciliation manager"))]
133 InitReconciliationManager {
134 #[snafu(implicit)]
135 location: Location,
136 source: common_meta::error::Error,
137 },
138
139 #[snafu(display("Failed to create default catalog and schema"))]
140 InitMetadata {
141 #[snafu(implicit)]
142 location: Location,
143 source: common_meta::error::Error,
144 },
145
146 #[snafu(display("Failed to allocate next sequence number"))]
147 NextSequence {
148 #[snafu(implicit)]
149 location: Location,
150 source: common_meta::error::Error,
151 },
152
153 #[snafu(display("Failed to set next sequence number"))]
154 SetNextSequence {
155 #[snafu(implicit)]
156 location: Location,
157 source: common_meta::error::Error,
158 },
159
160 #[snafu(display("Failed to peek sequence number"))]
161 PeekSequence {
162 #[snafu(implicit)]
163 location: Location,
164 source: common_meta::error::Error,
165 },
166
167 #[snafu(display("Failed to start telemetry task"))]
168 StartTelemetryTask {
169 #[snafu(implicit)]
170 location: Location,
171 source: common_runtime::error::Error,
172 },
173
174 #[snafu(display("Failed to submit ddl task"))]
175 SubmitDdlTask {
176 #[snafu(implicit)]
177 location: Location,
178 source: common_meta::error::Error,
179 },
180
181 #[snafu(display("Failed to submit reconcile procedure"))]
182 SubmitReconcileProcedure {
183 #[snafu(implicit)]
184 location: Location,
185 source: common_meta::error::Error,
186 },
187
188 #[snafu(display("Failed to invalidate table cache"))]
189 InvalidateTableCache {
190 #[snafu(implicit)]
191 location: Location,
192 source: common_meta::error::Error,
193 },
194
195 #[snafu(display("Failed to list catalogs"))]
196 ListCatalogs {
197 #[snafu(implicit)]
198 location: Location,
199 source: BoxedError,
200 },
201
202 #[snafu(display("Failed to list {}'s schemas", catalog))]
203 ListSchemas {
204 #[snafu(implicit)]
205 location: Location,
206 catalog: String,
207 source: BoxedError,
208 },
209
210 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
211 ListTables {
212 #[snafu(implicit)]
213 location: Location,
214 catalog: String,
215 schema: String,
216 source: BoxedError,
217 },
218
219 #[snafu(display("Failed to join a future"))]
220 Join {
221 #[snafu(implicit)]
222 location: Location,
223 #[snafu(source)]
224 error: JoinError,
225 },
226
227 #[snafu(display(
228 "Failed to request {}, required: {}, but only {} available",
229 select_target,
230 required,
231 available
232 ))]
233 NoEnoughAvailableNode {
234 #[snafu(implicit)]
235 location: Location,
236 required: usize,
237 available: usize,
238 select_target: SelectTarget,
239 },
240
241 #[snafu(display("Failed to send shutdown signal"))]
242 SendShutdownSignal {
243 #[snafu(source)]
244 error: SendError<()>,
245 },
246
247 #[snafu(display("Failed to shutdown {} server", server))]
248 ShutdownServer {
249 #[snafu(implicit)]
250 location: Location,
251 source: servers::error::Error,
252 server: String,
253 },
254
255 #[snafu(display("Empty key is not allowed"))]
256 EmptyKey {
257 #[snafu(implicit)]
258 location: Location,
259 },
260
261 #[snafu(display("Failed to execute via Etcd"))]
262 EtcdFailed {
263 #[snafu(source)]
264 error: etcd_client::Error,
265 #[snafu(implicit)]
266 location: Location,
267 },
268
269 #[snafu(display("Failed to connect to Etcd"))]
270 ConnectEtcd {
271 #[snafu(source)]
272 error: etcd_client::Error,
273 #[snafu(implicit)]
274 location: Location,
275 },
276
277 #[snafu(display("Failed to read file: {}", path))]
278 FileIo {
279 #[snafu(source)]
280 error: std::io::Error,
281 #[snafu(implicit)]
282 location: Location,
283 path: String,
284 },
285
286 #[snafu(display("Failed to bind address {}", addr))]
287 TcpBind {
288 addr: String,
289 #[snafu(source)]
290 error: std::io::Error,
291 #[snafu(implicit)]
292 location: Location,
293 },
294
295 #[snafu(display("Failed to start gRPC server"))]
296 StartGrpc {
297 #[snafu(source)]
298 error: tonic::transport::Error,
299 #[snafu(implicit)]
300 location: Location,
301 },
302
303 #[snafu(display("Failed to start http server"))]
304 StartHttp {
305 #[snafu(implicit)]
306 location: Location,
307 source: servers::error::Error,
308 },
309
310 #[snafu(display("Failed to parse address {}", addr))]
311 ParseAddr {
312 addr: String,
313 #[snafu(source)]
314 error: std::net::AddrParseError,
315 },
316
317 #[snafu(display("Invalid lease key: {}", key))]
318 InvalidLeaseKey {
319 key: String,
320 #[snafu(implicit)]
321 location: Location,
322 },
323
324 #[snafu(display("Invalid datanode stat key: {}", key))]
325 InvalidStatKey {
326 key: String,
327 #[snafu(implicit)]
328 location: Location,
329 },
330
331 #[snafu(display("Invalid inactive region key: {}", key))]
332 InvalidInactiveRegionKey {
333 key: String,
334 #[snafu(implicit)]
335 location: Location,
336 },
337
338 #[snafu(display("Failed to parse lease key from utf8"))]
339 LeaseKeyFromUtf8 {
340 #[snafu(source)]
341 error: std::string::FromUtf8Error,
342 #[snafu(implicit)]
343 location: Location,
344 },
345
346 #[snafu(display("Failed to parse lease value from utf8"))]
347 LeaseValueFromUtf8 {
348 #[snafu(source)]
349 error: std::string::FromUtf8Error,
350 #[snafu(implicit)]
351 location: Location,
352 },
353
354 #[snafu(display("Failed to parse invalid region key from utf8"))]
355 InvalidRegionKeyFromUtf8 {
356 #[snafu(source)]
357 error: std::string::FromUtf8Error,
358 #[snafu(implicit)]
359 location: Location,
360 },
361
362 #[snafu(display("Failed to serialize to json: {}", input))]
363 SerializeToJson {
364 input: String,
365 #[snafu(source)]
366 error: serde_json::error::Error,
367 #[snafu(implicit)]
368 location: Location,
369 },
370
371 #[snafu(display("Failed to deserialize from json: {}", input))]
372 DeserializeFromJson {
373 input: String,
374 #[snafu(source)]
375 error: serde_json::error::Error,
376 #[snafu(implicit)]
377 location: Location,
378 },
379
380 #[snafu(display("Failed to serialize config"))]
381 SerializeConfig {
382 #[snafu(source)]
383 error: serde_json::error::Error,
384 #[snafu(implicit)]
385 location: Location,
386 },
387
388 #[snafu(display("Failed to parse number: {}", err_msg))]
389 ParseNum {
390 err_msg: String,
391 #[snafu(source)]
392 error: std::num::ParseIntError,
393 #[snafu(implicit)]
394 location: Location,
395 },
396
397 #[snafu(display("Failed to parse bool: {}", err_msg))]
398 ParseBool {
399 err_msg: String,
400 #[snafu(source)]
401 error: std::str::ParseBoolError,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
407 DowngradeLeader {
408 region_id: RegionId,
409 #[snafu(implicit)]
410 location: Location,
411 #[snafu(source)]
412 source: BoxedError,
413 },
414
415 #[snafu(display("Region's leader peer changed: {}", msg))]
416 LeaderPeerChanged {
417 msg: String,
418 #[snafu(implicit)]
419 location: Location,
420 },
421
422 #[snafu(display("Invalid arguments: {}", err_msg))]
423 InvalidArguments {
424 err_msg: String,
425 #[snafu(implicit)]
426 location: Location,
427 },
428
429 #[cfg(feature = "mysql_kvbackend")]
430 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
431 ParseMySqlUrl {
432 #[snafu(source)]
433 error: sqlx::error::Error,
434 mysql_url: String,
435 #[snafu(implicit)]
436 location: Location,
437 },
438
439 #[cfg(feature = "mysql_kvbackend")]
440 #[snafu(display("Failed to decode sql value"))]
441 DecodeSqlValue {
442 #[snafu(source)]
443 error: sqlx::error::Error,
444 #[snafu(implicit)]
445 location: Location,
446 },
447
448 #[snafu(display("Failed to find table route for {table_id}"))]
449 TableRouteNotFound {
450 table_id: TableId,
451 #[snafu(implicit)]
452 location: Location,
453 },
454
455 #[snafu(display("Failed to find table route for {region_id}"))]
456 RegionRouteNotFound {
457 region_id: RegionId,
458 #[snafu(implicit)]
459 location: Location,
460 },
461
462 #[snafu(display("Table info not found: {}", table_id))]
463 TableInfoNotFound {
464 table_id: TableId,
465 #[snafu(implicit)]
466 location: Location,
467 },
468
469 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
470 DatanodeTableNotFound {
471 table_id: TableId,
472 datanode_id: DatanodeId,
473 #[snafu(implicit)]
474 location: Location,
475 },
476
477 #[snafu(display("Metasrv has no leader at this moment"))]
478 NoLeader {
479 #[snafu(implicit)]
480 location: Location,
481 },
482
483 #[snafu(display("Leader lease expired"))]
484 LeaderLeaseExpired {
485 #[snafu(implicit)]
486 location: Location,
487 },
488
489 #[snafu(display("Leader lease changed during election"))]
490 LeaderLeaseChanged {
491 #[snafu(implicit)]
492 location: Location,
493 },
494
495 #[snafu(display("Table {} not found", name))]
496 TableNotFound {
497 name: String,
498 #[snafu(implicit)]
499 location: Location,
500 },
501
502 #[snafu(display("Unsupported selector type, {}", selector_type))]
503 UnsupportedSelectorType {
504 selector_type: String,
505 #[snafu(implicit)]
506 location: Location,
507 },
508
509 #[snafu(display("Unexpected, violated: {violated}"))]
510 Unexpected {
511 violated: String,
512 #[snafu(implicit)]
513 location: Location,
514 },
515
516 #[snafu(display("Failed to create gRPC channel"))]
517 CreateChannel {
518 #[snafu(implicit)]
519 location: Location,
520 source: common_grpc::error::Error,
521 },
522
523 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
524 BatchGet {
525 #[snafu(source)]
526 error: tonic::Status,
527 #[snafu(implicit)]
528 location: Location,
529 },
530
531 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
532 Range {
533 #[snafu(source)]
534 error: tonic::Status,
535 #[snafu(implicit)]
536 location: Location,
537 },
538
539 #[snafu(display("Response header not found"))]
540 ResponseHeaderNotFound {
541 #[snafu(implicit)]
542 location: Location,
543 },
544
545 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
546 IsNotLeader {
547 node_addr: String,
548 #[snafu(implicit)]
549 location: Location,
550 },
551
552 #[snafu(display("Invalid http body"))]
553 InvalidHttpBody {
554 #[snafu(source)]
555 error: http::Error,
556 #[snafu(implicit)]
557 location: Location,
558 },
559
560 #[snafu(display(
561 "The number of retries for the grpc call {} exceeded the limit, {}",
562 func_name,
563 retry_num
564 ))]
565 ExceededRetryLimit {
566 func_name: String,
567 retry_num: usize,
568 #[snafu(implicit)]
569 location: Location,
570 },
571
572 #[snafu(display("Invalid utf-8 value"))]
573 InvalidUtf8Value {
574 #[snafu(source)]
575 error: std::string::FromUtf8Error,
576 #[snafu(implicit)]
577 location: Location,
578 },
579
580 #[snafu(display("Missing required parameter, param: {:?}", param))]
581 MissingRequiredParameter { param: String },
582
583 #[snafu(display("Failed to start procedure manager"))]
584 StartProcedureManager {
585 #[snafu(implicit)]
586 location: Location,
587 source: common_procedure::Error,
588 },
589
590 #[snafu(display("Failed to stop procedure manager"))]
591 StopProcedureManager {
592 #[snafu(implicit)]
593 location: Location,
594 source: common_procedure::Error,
595 },
596
597 #[snafu(display("Failed to wait procedure done"))]
598 WaitProcedure {
599 #[snafu(implicit)]
600 location: Location,
601 source: common_procedure::Error,
602 },
603
604 #[snafu(display("Failed to query procedure state"))]
605 QueryProcedure {
606 #[snafu(implicit)]
607 location: Location,
608 source: common_procedure::Error,
609 },
610
611 #[snafu(display("Procedure not found: {pid}"))]
612 ProcedureNotFound {
613 #[snafu(implicit)]
614 location: Location,
615 pid: String,
616 },
617
618 #[snafu(display("Failed to submit procedure"))]
619 SubmitProcedure {
620 #[snafu(implicit)]
621 location: Location,
622 source: common_procedure::Error,
623 },
624
625 #[snafu(display("A prune task for topic {} is already running", topic))]
626 PruneTaskAlreadyRunning {
627 topic: String,
628 #[snafu(implicit)]
629 location: Location,
630 },
631
632 #[snafu(display("Schema already exists, name: {schema_name}"))]
633 SchemaAlreadyExists {
634 schema_name: String,
635 #[snafu(implicit)]
636 location: Location,
637 },
638
639 #[snafu(display("Table already exists: {table_name}"))]
640 TableAlreadyExists {
641 table_name: String,
642 #[snafu(implicit)]
643 location: Location,
644 },
645
646 #[snafu(display("Pusher not found: {pusher_id}"))]
647 PusherNotFound {
648 pusher_id: String,
649 #[snafu(implicit)]
650 location: Location,
651 },
652
653 #[snafu(display("Failed to push message: {err_msg}"))]
654 PushMessage {
655 err_msg: String,
656 #[snafu(implicit)]
657 location: Location,
658 },
659
660 #[snafu(display("Mailbox already closed: {id}"))]
661 MailboxClosed {
662 id: u64,
663 #[snafu(implicit)]
664 location: Location,
665 },
666
667 #[snafu(display("Mailbox timeout: {id}"))]
668 MailboxTimeout {
669 id: u64,
670 #[snafu(implicit)]
671 location: Location,
672 },
673
674 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
675 MailboxReceiver {
676 id: u64,
677 err_msg: String,
678 #[snafu(implicit)]
679 location: Location,
680 },
681
682 #[snafu(display("Mailbox channel closed: {channel}"))]
683 MailboxChannelClosed {
684 channel: Channel,
685 #[snafu(implicit)]
686 location: Location,
687 },
688
689 #[snafu(display("Missing request header"))]
690 MissingRequestHeader {
691 #[snafu(implicit)]
692 location: Location,
693 },
694
695 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
696 RegisterProcedureLoader {
697 type_name: String,
698 #[snafu(implicit)]
699 location: Location,
700 source: common_procedure::error::Error,
701 },
702
703 #[snafu(display(
704 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
705 mailbox_message,
706 reason
707 ))]
708 UnexpectedInstructionReply {
709 mailbox_message: String,
710 reason: String,
711 #[snafu(implicit)]
712 location: Location,
713 },
714
715 #[snafu(display("Expected to retry later, reason: {}", reason))]
716 RetryLater {
717 reason: String,
718 #[snafu(implicit)]
719 location: Location,
720 },
721
722 #[snafu(display("Expected to retry later, reason: {}", reason))]
723 RetryLaterWithSource {
724 reason: String,
725 #[snafu(implicit)]
726 location: Location,
727 source: BoxedError,
728 },
729
730 #[snafu(display("Failed to convert proto data"))]
731 ConvertProtoData {
732 #[snafu(implicit)]
733 location: Location,
734 source: common_meta::error::Error,
735 },
736
737 #[snafu(display("Other error"))]
740 Other {
741 source: BoxedError,
742 #[snafu(implicit)]
743 location: Location,
744 },
745
746 #[snafu(display("Table metadata manager error"))]
747 TableMetadataManager {
748 source: common_meta::error::Error,
749 #[snafu(implicit)]
750 location: Location,
751 },
752
753 #[snafu(display("Runtime switch manager error"))]
754 RuntimeSwitchManager {
755 source: common_meta::error::Error,
756 #[snafu(implicit)]
757 location: Location,
758 },
759
760 #[snafu(display("Keyvalue backend error"))]
761 KvBackend {
762 source: common_meta::error::Error,
763 #[snafu(implicit)]
764 location: Location,
765 },
766
767 #[snafu(display("Failed to publish message"))]
768 PublishMessage {
769 #[snafu(source)]
770 error: SendError<Message>,
771 #[snafu(implicit)]
772 location: Location,
773 },
774
775 #[snafu(display("Too many partitions"))]
776 TooManyPartitions {
777 #[snafu(implicit)]
778 location: Location,
779 },
780
781 #[snafu(display("Failed to create repartition subtasks"))]
782 RepartitionCreateSubtasks {
783 source: partition::error::Error,
784 #[snafu(implicit)]
785 location: Location,
786 },
787
788 #[snafu(display(
789 "Source partition expression '{}' does not match any existing region",
790 expr
791 ))]
792 RepartitionSourceExprMismatch {
793 expr: String,
794 #[snafu(implicit)]
795 location: Location,
796 },
797
798 #[snafu(display(
799 "Failed to get the state receiver for repartition subprocedure {}",
800 procedure_id
801 ))]
802 RepartitionSubprocedureStateReceiver {
803 procedure_id: ProcedureId,
804 #[snafu(source)]
805 source: common_procedure::Error,
806 #[snafu(implicit)]
807 location: Location,
808 },
809
810 #[snafu(display("Unsupported operation {}", operation))]
811 Unsupported {
812 operation: String,
813 #[snafu(implicit)]
814 location: Location,
815 },
816
817 #[snafu(display("Unexpected table route type: {}", err_msg))]
818 UnexpectedLogicalRouteTable {
819 #[snafu(implicit)]
820 location: Location,
821 err_msg: String,
822 source: common_meta::error::Error,
823 },
824
825 #[snafu(display("Failed to save cluster info"))]
826 SaveClusterInfo {
827 #[snafu(implicit)]
828 location: Location,
829 source: common_meta::error::Error,
830 },
831
832 #[snafu(display("Invalid cluster info format"))]
833 InvalidClusterInfoFormat {
834 #[snafu(implicit)]
835 location: Location,
836 source: common_meta::error::Error,
837 },
838
839 #[snafu(display("Invalid datanode stat format"))]
840 InvalidDatanodeStatFormat {
841 #[snafu(implicit)]
842 location: Location,
843 source: common_meta::error::Error,
844 },
845
846 #[snafu(display("Invalid node info format"))]
847 InvalidNodeInfoFormat {
848 #[snafu(implicit)]
849 location: Location,
850 source: common_meta::error::Error,
851 },
852
853 #[snafu(display("Failed to serialize options to TOML"))]
854 TomlFormat {
855 #[snafu(implicit)]
856 location: Location,
857 #[snafu(source(from(common_config::error::Error, Box::new)))]
858 source: Box<common_config::error::Error>,
859 },
860
861 #[cfg(feature = "pg_kvbackend")]
862 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
863 PostgresExecution {
864 #[snafu(source)]
865 error: tokio_postgres::Error,
866 sql: String,
867 #[snafu(implicit)]
868 location: Location,
869 },
870
871 #[cfg(feature = "pg_kvbackend")]
872 #[snafu(display("Failed to get Postgres client"))]
873 GetPostgresClient {
874 #[snafu(implicit)]
875 location: Location,
876 #[snafu(source)]
877 error: deadpool::managed::PoolError<tokio_postgres::Error>,
878 },
879
880 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
881 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
882 SqlExecutionTimeout {
883 #[snafu(implicit)]
884 location: Location,
885 sql: String,
886 duration: std::time::Duration,
887 },
888
889 #[cfg(feature = "pg_kvbackend")]
890 #[snafu(display("Failed to create connection pool for Postgres"))]
891 CreatePostgresPool {
892 #[snafu(source)]
893 error: deadpool_postgres::CreatePoolError,
894 #[snafu(implicit)]
895 location: Location,
896 },
897
898 #[cfg(feature = "pg_kvbackend")]
899 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
900 GetPostgresConnection {
901 reason: String,
902 #[snafu(implicit)]
903 location: Location,
904 },
905
906 #[cfg(feature = "mysql_kvbackend")]
907 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
908 MySqlExecution {
909 #[snafu(source)]
910 error: sqlx::Error,
911 #[snafu(implicit)]
912 location: Location,
913 sql: String,
914 },
915
916 #[cfg(feature = "mysql_kvbackend")]
917 #[snafu(display("Failed to create mysql pool"))]
918 CreateMySqlPool {
919 #[snafu(source)]
920 error: sqlx::Error,
921 #[snafu(implicit)]
922 location: Location,
923 },
924
925 #[cfg(feature = "mysql_kvbackend")]
926 #[snafu(display("Failed to acquire mysql client from pool"))]
927 AcquireMySqlClient {
928 #[snafu(source)]
929 error: sqlx::Error,
930 #[snafu(implicit)]
931 location: Location,
932 },
933
934 #[snafu(display("Handler not found: {}", name))]
935 HandlerNotFound {
936 name: String,
937 #[snafu(implicit)]
938 location: Location,
939 },
940
941 #[snafu(display("Flow state handler error"))]
942 FlowStateHandler {
943 #[snafu(implicit)]
944 location: Location,
945 source: common_meta::error::Error,
946 },
947
948 #[snafu(display("Failed to build wal provider"))]
949 BuildWalProvider {
950 #[snafu(implicit)]
951 location: Location,
952 source: common_meta::error::Error,
953 },
954
955 #[snafu(display("Failed to build kafka client."))]
956 BuildKafkaClient {
957 #[snafu(implicit)]
958 location: Location,
959 #[snafu(source)]
960 error: common_meta::error::Error,
961 },
962
963 #[snafu(display(
964 "Failed to build a Kafka partition client, topic: {}, partition: {}",
965 topic,
966 partition
967 ))]
968 BuildPartitionClient {
969 topic: String,
970 partition: i32,
971 #[snafu(implicit)]
972 location: Location,
973 #[snafu(source)]
974 error: rskafka::client::error::Error,
975 },
976
977 #[snafu(display(
978 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
979 topic,
980 partition,
981 offset
982 ))]
983 DeleteRecords {
984 #[snafu(implicit)]
985 location: Location,
986 #[snafu(source)]
987 error: rskafka::client::error::Error,
988 topic: String,
989 partition: i32,
990 offset: u64,
991 },
992
993 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
994 GetOffset {
995 topic: String,
996 #[snafu(implicit)]
997 location: Location,
998 #[snafu(source)]
999 error: rskafka::client::error::Error,
1000 },
1001
1002 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1003 UpdateTopicNameValue {
1004 topic: String,
1005 #[snafu(implicit)]
1006 location: Location,
1007 #[snafu(source)]
1008 source: common_meta::error::Error,
1009 },
1010
1011 #[snafu(display("Failed to build tls options"))]
1012 BuildTlsOptions {
1013 #[snafu(implicit)]
1014 location: Location,
1015 #[snafu(source)]
1016 source: common_meta::error::Error,
1017 },
1018
1019 #[snafu(display(
1020 "Repartition group {} source region missing, region id: {}",
1021 group_id,
1022 region_id
1023 ))]
1024 RepartitionSourceRegionMissing {
1025 group_id: Uuid,
1026 region_id: RegionId,
1027 #[snafu(implicit)]
1028 location: Location,
1029 },
1030
1031 #[snafu(display(
1032 "Repartition group {} target region missing, region id: {}",
1033 group_id,
1034 region_id
1035 ))]
1036 RepartitionTargetRegionMissing {
1037 group_id: Uuid,
1038 region_id: RegionId,
1039 #[snafu(implicit)]
1040 location: Location,
1041 },
1042
1043 #[snafu(display("Failed to serialize partition expression"))]
1044 SerializePartitionExpr {
1045 #[snafu(source)]
1046 source: partition::error::Error,
1047 #[snafu(implicit)]
1048 location: Location,
1049 },
1050
1051 #[snafu(display("Failed to deserialize partition expression"))]
1052 DeserializePartitionExpr {
1053 #[snafu(source)]
1054 source: partition::error::Error,
1055 #[snafu(implicit)]
1056 location: Location,
1057 },
1058
1059 #[snafu(display("Empty partition expression"))]
1060 EmptyPartitionExpr {
1061 #[snafu(implicit)]
1062 location: Location,
1063 },
1064
1065 #[snafu(display(
1066 "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1067 region_id,
1068 expected,
1069 actual
1070 ))]
1071 PartitionExprMismatch {
1072 region_id: RegionId,
1073 expected: String,
1074 actual: String,
1075 #[snafu(implicit)]
1076 location: Location,
1077 },
1078
1079 #[snafu(display("Failed to allocate regions for table: {}", table_id))]
1080 AllocateRegions {
1081 #[snafu(implicit)]
1082 location: Location,
1083 table_id: TableId,
1084 #[snafu(source)]
1085 source: common_meta::error::Error,
1086 },
1087
1088 #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1089 DeallocateRegions {
1090 #[snafu(implicit)]
1091 location: Location,
1092 table_id: TableId,
1093 #[snafu(source)]
1094 source: common_meta::error::Error,
1095 },
1096
1097 #[snafu(display("Failed to build create request for table: {}", table_id))]
1098 BuildCreateRequest {
1099 #[snafu(implicit)]
1100 location: Location,
1101 table_id: TableId,
1102 #[snafu(source)]
1103 source: common_meta::error::Error,
1104 },
1105
1106 #[snafu(display("Failed to allocate region routes for table: {}", table_id))]
1107 AllocateRegionRoutes {
1108 #[snafu(implicit)]
1109 location: Location,
1110 table_id: TableId,
1111 #[snafu(source)]
1112 source: common_meta::error::Error,
1113 },
1114
1115 #[snafu(display("Failed to allocate wal options for table: {}", table_id))]
1116 AllocateWalOptions {
1117 #[snafu(implicit)]
1118 location: Location,
1119 table_id: TableId,
1120 #[snafu(source)]
1121 source: common_meta::error::Error,
1122 },
1123}
1124
1125impl Error {
1126 pub fn is_retryable(&self) -> bool {
1128 self.retry_hint().is_retryable()
1129 }
1130}
1131
1132pub type Result<T> = std::result::Result<T, Error>;
1133
1134define_into_tonic_status!(Error);
1135
1136impl ErrorExt for Error {
1137 fn status_code(&self) -> StatusCode {
1138 match self {
1139 Error::EtcdFailed { .. }
1140 | Error::ConnectEtcd { .. }
1141 | Error::FileIo { .. }
1142 | Error::TcpBind { .. }
1143 | Error::SerializeConfig { .. }
1144 | Error::SerializeToJson { .. }
1145 | Error::DeserializeFromJson { .. }
1146 | Error::NoLeader { .. }
1147 | Error::LeaderLeaseExpired { .. }
1148 | Error::LeaderLeaseChanged { .. }
1149 | Error::CreateChannel { .. }
1150 | Error::BatchGet { .. }
1151 | Error::Range { .. }
1152 | Error::ResponseHeaderNotFound { .. }
1153 | Error::InvalidHttpBody { .. }
1154 | Error::ExceededRetryLimit { .. }
1155 | Error::SendShutdownSignal { .. }
1156 | Error::PushMessage { .. }
1157 | Error::MailboxClosed { .. }
1158 | Error::MailboxReceiver { .. }
1159 | Error::StartGrpc { .. }
1160 | Error::PublishMessage { .. }
1161 | Error::Join { .. }
1162 | Error::ChooseItems { .. }
1163 | Error::FlowStateHandler { .. }
1164 | Error::BuildWalProvider { .. }
1165 | Error::BuildPartitionClient { .. }
1166 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1167
1168 Error::DeleteRecords { .. }
1169 | Error::GetOffset { .. }
1170 | Error::PeerUnavailable { .. }
1171 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1172 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1173 Error::PruneTaskAlreadyRunning { .. }
1174 | Error::RetryLater { .. }
1175 | Error::MailboxChannelClosed { .. }
1176 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1177 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1178 Error::SerializePartitionExpr { source, .. }
1179 | Error::DeserializePartitionExpr { source, .. } => source.status_code(),
1180
1181 Error::Unsupported { .. } => StatusCode::Unsupported,
1182
1183 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1184
1185 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1186 Error::EmptyKey { .. }
1187 | Error::MissingRequiredParameter { .. }
1188 | Error::MissingRequestHeader { .. }
1189 | Error::InvalidLeaseKey { .. }
1190 | Error::InvalidStatKey { .. }
1191 | Error::InvalidInactiveRegionKey { .. }
1192 | Error::ParseNum { .. }
1193 | Error::ParseBool { .. }
1194 | Error::ParseAddr { .. }
1195 | Error::UnsupportedSelectorType { .. }
1196 | Error::InvalidArguments { .. }
1197 | Error::ProcedureNotFound { .. }
1198 | Error::TooManyPartitions { .. }
1199 | Error::TomlFormat { .. }
1200 | Error::HandlerNotFound { .. }
1201 | Error::LeaderPeerChanged { .. }
1202 | Error::RepartitionSourceRegionMissing { .. }
1203 | Error::RepartitionTargetRegionMissing { .. }
1204 | Error::PartitionExprMismatch { .. }
1205 | Error::RepartitionSourceExprMismatch { .. }
1206 | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
1207 Error::LeaseKeyFromUtf8 { .. }
1208 | Error::LeaseValueFromUtf8 { .. }
1209 | Error::InvalidRegionKeyFromUtf8 { .. }
1210 | Error::TableRouteNotFound { .. }
1211 | Error::TableInfoNotFound { .. }
1212 | Error::DatanodeTableNotFound { .. }
1213 | Error::InvalidUtf8Value { .. }
1214 | Error::UnexpectedInstructionReply { .. }
1215 | Error::Unexpected { .. }
1216 | Error::RegionOperatingRace { .. }
1217 | Error::RegionRouteNotFound { .. }
1218 | Error::MigrationAbort { .. }
1219 | Error::MigrationRunning { .. }
1220 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1221 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1222 Error::SaveClusterInfo { source, .. }
1223 | Error::InvalidClusterInfoFormat { source, .. }
1224 | Error::InvalidDatanodeStatFormat { source, .. }
1225 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1226 Error::InvalidateTableCache { source, .. } => source.status_code(),
1227 Error::SubmitProcedure { source, .. }
1228 | Error::WaitProcedure { source, .. }
1229 | Error::QueryProcedure { source, .. } => source.status_code(),
1230 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1231 source.status_code()
1232 }
1233 Error::StartProcedureManager { source, .. }
1234 | Error::StopProcedureManager { source, .. } => source.status_code(),
1235
1236 Error::ListCatalogs { source, .. }
1237 | Error::ListSchemas { source, .. }
1238 | Error::ListTables { source, .. } => source.status_code(),
1239 Error::StartTelemetryTask { source, .. } => source.status_code(),
1240
1241 Error::NextSequence { source, .. }
1242 | Error::SetNextSequence { source, .. }
1243 | Error::PeekSequence { source, .. } => source.status_code(),
1244 Error::DowngradeLeader { source, .. } => source.status_code(),
1245 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1246 Error::SubmitDdlTask { source, .. }
1247 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1248 Error::ConvertProtoData { source, .. }
1249 | Error::TableMetadataManager { source, .. }
1250 | Error::RuntimeSwitchManager { source, .. }
1251 | Error::KvBackend { source, .. }
1252 | Error::UnexpectedLogicalRouteTable { source, .. }
1253 | Error::UpdateTopicNameValue { source, .. } => source.status_code(),
1254 Error::ListActiveFrontends { source, .. }
1255 | Error::ListActiveDatanodes { source, .. }
1256 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1257 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1258
1259 Error::InitMetadata { source, .. }
1260 | Error::InitDdlManager { source, .. }
1261 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1262
1263 Error::BuildTlsOptions { source, .. } => source.status_code(),
1264 Error::Other { source, .. } => source.status_code(),
1265 Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1266 Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1267 Error::AllocateRegions { source, .. } => source.status_code(),
1268 Error::DeallocateRegions { source, .. } => source.status_code(),
1269 Error::AllocateRegionRoutes { source, .. } => source.status_code(),
1270 Error::AllocateWalOptions { source, .. } => source.status_code(),
1271 Error::BuildCreateRequest { source, .. } => source.status_code(),
1272 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1273
1274 #[cfg(feature = "pg_kvbackend")]
1275 Error::CreatePostgresPool { .. }
1276 | Error::GetPostgresClient { .. }
1277 | Error::GetPostgresConnection { .. }
1278 | Error::PostgresExecution { .. } => StatusCode::Internal,
1279 #[cfg(feature = "mysql_kvbackend")]
1280 Error::MySqlExecution { .. }
1281 | Error::CreateMySqlPool { .. }
1282 | Error::ParseMySqlUrl { .. }
1283 | Error::DecodeSqlValue { .. }
1284 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1285 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1286 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1287 }
1288 }
1289
1290 fn as_any(&self) -> &dyn std::any::Any {
1291 self
1292 }
1293
1294 fn retry_hint(&self) -> RetryHint {
1295 match self {
1296 Error::RetryLater { .. }
1297 | Error::RetryLaterWithSource { .. }
1298 | Error::MailboxTimeout { .. }
1299 | Error::NoEnoughAvailableNode { .. }
1300 | Error::NoLeader { .. }
1301 | Error::LeaderLeaseExpired { .. }
1302 | Error::LeaderLeaseChanged { .. }
1303 | Error::PeerUnavailable { .. } => RetryHint::Retryable,
1304
1305 Error::ConnectEtcd { error, .. } | Error::EtcdFailed { error, .. } => {
1306 common_meta::error::retry_hint_from_etcd_error(error)
1307 }
1308
1309 #[cfg(feature = "pg_kvbackend")]
1310 Error::PostgresExecution { error, .. } => {
1311 common_meta::error::retry_hint_from_postgres_error(error)
1312 }
1313 #[cfg(feature = "pg_kvbackend")]
1314 Error::GetPostgresClient { error, .. } => {
1315 common_meta::error::retry_hint_from_postgres_pool_error(error)
1316 }
1317 #[cfg(feature = "mysql_kvbackend")]
1318 Error::MySqlExecution { error, .. }
1319 | Error::CreateMySqlPool { error, .. }
1320 | Error::AcquireMySqlClient { error, .. } => {
1321 common_meta::error::retry_hint_from_sqlx_error(error)
1322 }
1323 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1324 Error::SqlExecutionTimeout { .. } => RetryHint::Retryable,
1325
1326 Error::ListActiveFrontends { source, .. }
1327 | Error::ListActiveDatanodes { source, .. }
1328 | Error::ListActiveFlownodes { source, .. }
1329 | Error::InitDdlManager { source, .. }
1330 | Error::InitReconciliationManager { source, .. }
1331 | Error::InitMetadata { source, .. }
1332 | Error::NextSequence { source, .. }
1333 | Error::SetNextSequence { source, .. }
1334 | Error::PeekSequence { source, .. }
1335 | Error::SubmitDdlTask { source, .. }
1336 | Error::SubmitReconcileProcedure { source, .. }
1337 | Error::InvalidateTableCache { source, .. }
1338 | Error::ConvertProtoData { source, .. }
1339 | Error::TableMetadataManager { source, .. }
1340 | Error::RuntimeSwitchManager { source, .. }
1341 | Error::KvBackend { source, .. }
1342 | Error::UnexpectedLogicalRouteTable { source, .. }
1343 | Error::SaveClusterInfo { source, .. }
1344 | Error::InvalidClusterInfoFormat { source, .. }
1345 | Error::InvalidDatanodeStatFormat { source, .. }
1346 | Error::InvalidNodeInfoFormat { source, .. }
1347 | Error::FlowStateHandler { source, .. }
1348 | Error::BuildWalProvider { source, .. }
1349 | Error::BuildKafkaClient { error: source, .. }
1350 | Error::UpdateTopicNameValue { source, .. }
1351 | Error::BuildTlsOptions { source, .. }
1352 | Error::AllocateRegions { source, .. }
1353 | Error::DeallocateRegions { source, .. }
1354 | Error::BuildCreateRequest { source, .. }
1355 | Error::AllocateRegionRoutes { source, .. }
1356 | Error::AllocateWalOptions { source, .. } => source.retry_hint(),
1357
1358 Error::Other { source, .. }
1359 | Error::ListCatalogs { source, .. }
1360 | Error::ListSchemas { source, .. }
1361 | Error::ListTables { source, .. }
1362 | Error::DowngradeLeader { source, .. } => source.retry_hint(),
1363
1364 Error::SubmitProcedure { source, .. }
1365 | Error::WaitProcedure { source, .. }
1366 | Error::QueryProcedure { source, .. }
1367 | Error::StartProcedureManager { source, .. }
1368 | Error::StopProcedureManager { source, .. }
1369 | Error::RegisterProcedureLoader { source, .. }
1370 | Error::RepartitionSubprocedureStateReceiver { source, .. } => source.retry_hint(),
1371
1372 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1373 source.retry_hint()
1374 }
1375 Error::StartTelemetryTask { source, .. } => source.retry_hint(),
1376 Error::CreateChannel { source, .. } => source.retry_hint(),
1377 Error::RepartitionCreateSubtasks { source, .. } => source.retry_hint(),
1378 Error::SerializePartitionExpr { source, .. }
1379 | Error::DeserializePartitionExpr { source, .. } => source.retry_hint(),
1380
1381 Error::DeleteRecords { error, .. }
1382 | Error::BuildPartitionClient { error, .. }
1383 | Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error),
1384
1385 Error::PusherNotFound { .. }
1386 | Error::PushMessage { .. }
1387 | Error::ExceededDeadline { .. } => RetryHint::NonRetryable,
1388
1389 _ => RetryHint::NonRetryable,
1390 }
1391 }
1392}
1393
1394pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1396 let mut err: &(dyn std::error::Error + 'static) = err_status;
1397
1398 loop {
1399 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1400 return Some(io_err);
1401 }
1402
1403 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1406 && let Some(io_err) = h2_err.get_io()
1407 {
1408 return Some(io_err);
1409 }
1410
1411 err = err.source()?;
1412 }
1413}
1414
1415#[cfg(test)]
1416mod tests {
1417 use std::time::Duration;
1418
1419 use common_error::ext::ErrorExt;
1420 use common_error::mock::MockError;
1421 use common_error::status_code::StatusCode;
1422 use rskafka::BackoffError;
1423 use rskafka::client::error::Error as KafkaClientError;
1424 use snafu::ResultExt;
1425
1426 use super::{
1427 BuildPartitionClientSnafu, DeallocateRegionsSnafu, DeleteRecordsSnafu, GetOffsetSnafu,
1428 };
1429
1430 fn retry_failed_kafka_error() -> KafkaClientError {
1431 KafkaClientError::RetryFailed(BackoffError::DeadlineExceded {
1432 deadline: Duration::from_secs(1),
1433 source: Box::new(std::io::Error::other("retry failed")),
1434 })
1435 }
1436
1437 #[test]
1438 fn test_deallocate_regions_is_retryable_when_source_is_retry_later() {
1439 let source = common_meta::error::Error::retry_later(MockError::new(StatusCode::Internal));
1440 let err = Err::<(), _>(source)
1441 .context(DeallocateRegionsSnafu { table_id: 1024_u32 })
1442 .unwrap_err();
1443
1444 assert!(err.is_retryable());
1445 assert!(err.retry_hint().is_retryable());
1446 }
1447
1448 #[test]
1449 fn test_deallocate_regions_is_not_retryable_when_source_is_not_retry_later() {
1450 let source = common_meta::error::UnexpectedSnafu {
1451 err_msg: "mock error",
1452 }
1453 .build();
1454 let err = Err::<(), _>(source)
1455 .context(DeallocateRegionsSnafu { table_id: 1024_u32 })
1456 .unwrap_err();
1457
1458 assert!(!err.is_retryable());
1459 assert!(!err.retry_hint().is_retryable());
1460 }
1461
1462 #[test]
1463 fn test_kafka_retry_failed_errors_are_retryable() {
1464 let delete_records_err = Err::<(), _>(retry_failed_kafka_error())
1465 .context(DeleteRecordsSnafu {
1466 topic: "test_topic",
1467 partition: 0,
1468 offset: 1024u64,
1469 })
1470 .unwrap_err();
1471 let build_partition_client_err = Err::<(), _>(retry_failed_kafka_error())
1472 .context(BuildPartitionClientSnafu {
1473 topic: "test_topic",
1474 partition: 0,
1475 })
1476 .unwrap_err();
1477 let get_offset_err = Err::<(), _>(retry_failed_kafka_error())
1478 .context(GetOffsetSnafu {
1479 topic: "test_topic",
1480 })
1481 .unwrap_err();
1482
1483 assert!(delete_records_err.is_retryable());
1484 assert!(build_partition_client_err.is_retryable());
1485 assert!(get_offset_err.is_retryable());
1486 assert!(delete_records_err.retry_hint().is_retryable());
1487 assert!(build_partition_client_err.retry_hint().is_retryable());
1488 assert!(get_offset_err.retry_hint().is_retryable());
1489 }
1490
1491 #[test]
1492 fn test_kafka_non_retry_failed_errors_are_not_retryable() {
1493 let delete_records_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1494 .context(DeleteRecordsSnafu {
1495 topic: "test_topic",
1496 partition: 0,
1497 offset: 1024u64,
1498 })
1499 .unwrap_err();
1500 let build_partition_client_err =
1501 Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1502 .context(BuildPartitionClientSnafu {
1503 topic: "test_topic",
1504 partition: 0,
1505 })
1506 .unwrap_err();
1507 let get_offset_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
1508 .context(GetOffsetSnafu {
1509 topic: "test_topic",
1510 })
1511 .unwrap_err();
1512
1513 assert!(!delete_records_err.is_retryable());
1514 assert!(!build_partition_client_err.is_retryable());
1515 assert!(!get_offset_err.is_retryable());
1516 assert!(!delete_records_err.retry_hint().is_retryable());
1517 assert!(!build_partition_client_err.retry_hint().is_retryable());
1518 assert!(!get_offset_err.retry_hint().is_retryable());
1519 }
1520}