1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::metadata::TableId;
25use tokio::sync::mpsc::error::SendError;
26use tonic::codegen::http;
27use uuid::Uuid;
28
29use crate::metasrv::SelectTarget;
30use crate::pubsub::Message;
31use crate::service::mailbox::Channel;
32
33#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37 #[snafu(display("Failed to choose items"))]
38 ChooseItems {
39 #[snafu(implicit)]
40 location: Location,
41 #[snafu(source)]
42 error: rand::distr::weighted::Error,
43 },
44
45 #[snafu(display("Exceeded deadline, operation: {}", operation))]
46 ExceededDeadline {
47 #[snafu(implicit)]
48 location: Location,
49 operation: String,
50 },
51
52 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
53 PeerUnavailable {
54 #[snafu(implicit)]
55 location: Location,
56 peer_id: u64,
57 },
58
59 #[snafu(display("Failed to list active frontends"))]
60 ListActiveFrontends {
61 #[snafu(implicit)]
62 location: Location,
63 source: common_meta::error::Error,
64 },
65
66 #[snafu(display("Failed to list active datanodes"))]
67 ListActiveDatanodes {
68 #[snafu(implicit)]
69 location: Location,
70 source: common_meta::error::Error,
71 },
72
73 #[snafu(display("Failed to list active flownodes"))]
74 ListActiveFlownodes {
75 #[snafu(implicit)]
76 location: Location,
77 source: common_meta::error::Error,
78 },
79
80 #[snafu(display("No available frontend"))]
81 NoAvailableFrontend {
82 #[snafu(implicit)]
83 location: Location,
84 },
85
86 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
87 MigrationRunning {
88 #[snafu(implicit)]
89 location: Location,
90 region_id: RegionId,
91 },
92
93 #[snafu(display(
94 "The region migration procedure is completed for region: {}, target_peer: {}",
95 region_id,
96 target_peer_id
97 ))]
98 RegionMigrated {
99 #[snafu(implicit)]
100 location: Location,
101 region_id: RegionId,
102 target_peer_id: u64,
103 },
104
105 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
106 MigrationAbort {
107 #[snafu(implicit)]
108 location: Location,
109 reason: String,
110 },
111
112 #[snafu(display(
113 "Another procedure is operating the region: {} on peer: {}",
114 region_id,
115 peer_id
116 ))]
117 RegionOperatingRace {
118 #[snafu(implicit)]
119 location: Location,
120 peer_id: DatanodeId,
121 region_id: RegionId,
122 },
123
124 #[snafu(display("Failed to init ddl manager"))]
125 InitDdlManager {
126 #[snafu(implicit)]
127 location: Location,
128 source: common_meta::error::Error,
129 },
130
131 #[snafu(display("Failed to init reconciliation manager"))]
132 InitReconciliationManager {
133 #[snafu(implicit)]
134 location: Location,
135 source: common_meta::error::Error,
136 },
137
138 #[snafu(display("Failed to create default catalog and schema"))]
139 InitMetadata {
140 #[snafu(implicit)]
141 location: Location,
142 source: common_meta::error::Error,
143 },
144
145 #[snafu(display("Failed to allocate next sequence number"))]
146 NextSequence {
147 #[snafu(implicit)]
148 location: Location,
149 source: common_meta::error::Error,
150 },
151
152 #[snafu(display("Failed to set next sequence number"))]
153 SetNextSequence {
154 #[snafu(implicit)]
155 location: Location,
156 source: common_meta::error::Error,
157 },
158
159 #[snafu(display("Failed to peek sequence number"))]
160 PeekSequence {
161 #[snafu(implicit)]
162 location: Location,
163 source: common_meta::error::Error,
164 },
165
166 #[snafu(display("Failed to start telemetry task"))]
167 StartTelemetryTask {
168 #[snafu(implicit)]
169 location: Location,
170 source: common_runtime::error::Error,
171 },
172
173 #[snafu(display("Failed to submit ddl task"))]
174 SubmitDdlTask {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_meta::error::Error,
178 },
179
180 #[snafu(display("Failed to submit reconcile procedure"))]
181 SubmitReconcileProcedure {
182 #[snafu(implicit)]
183 location: Location,
184 source: common_meta::error::Error,
185 },
186
187 #[snafu(display("Failed to invalidate table cache"))]
188 InvalidateTableCache {
189 #[snafu(implicit)]
190 location: Location,
191 source: common_meta::error::Error,
192 },
193
194 #[snafu(display("Failed to list catalogs"))]
195 ListCatalogs {
196 #[snafu(implicit)]
197 location: Location,
198 source: BoxedError,
199 },
200
201 #[snafu(display("Failed to list {}'s schemas", catalog))]
202 ListSchemas {
203 #[snafu(implicit)]
204 location: Location,
205 catalog: String,
206 source: BoxedError,
207 },
208
209 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
210 ListTables {
211 #[snafu(implicit)]
212 location: Location,
213 catalog: String,
214 schema: String,
215 source: BoxedError,
216 },
217
218 #[snafu(display("Failed to join a future"))]
219 Join {
220 #[snafu(implicit)]
221 location: Location,
222 #[snafu(source)]
223 error: JoinError,
224 },
225
226 #[snafu(display(
227 "Failed to request {}, required: {}, but only {} available",
228 select_target,
229 required,
230 available
231 ))]
232 NoEnoughAvailableNode {
233 #[snafu(implicit)]
234 location: Location,
235 required: usize,
236 available: usize,
237 select_target: SelectTarget,
238 },
239
240 #[snafu(display("Failed to send shutdown signal"))]
241 SendShutdownSignal {
242 #[snafu(source)]
243 error: SendError<()>,
244 },
245
246 #[snafu(display("Failed to shutdown {} server", server))]
247 ShutdownServer {
248 #[snafu(implicit)]
249 location: Location,
250 source: servers::error::Error,
251 server: String,
252 },
253
254 #[snafu(display("Empty key is not allowed"))]
255 EmptyKey {
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to execute via Etcd"))]
261 EtcdFailed {
262 #[snafu(source)]
263 error: etcd_client::Error,
264 #[snafu(implicit)]
265 location: Location,
266 },
267
268 #[snafu(display("Failed to connect to Etcd"))]
269 ConnectEtcd {
270 #[snafu(source)]
271 error: etcd_client::Error,
272 #[snafu(implicit)]
273 location: Location,
274 },
275
276 #[snafu(display("Failed to read file: {}", path))]
277 FileIo {
278 #[snafu(source)]
279 error: std::io::Error,
280 #[snafu(implicit)]
281 location: Location,
282 path: String,
283 },
284
285 #[snafu(display("Failed to bind address {}", addr))]
286 TcpBind {
287 addr: String,
288 #[snafu(source)]
289 error: std::io::Error,
290 #[snafu(implicit)]
291 location: Location,
292 },
293
294 #[snafu(display("Failed to start gRPC server"))]
295 StartGrpc {
296 #[snafu(source)]
297 error: tonic::transport::Error,
298 #[snafu(implicit)]
299 location: Location,
300 },
301
302 #[snafu(display("Failed to start http server"))]
303 StartHttp {
304 #[snafu(implicit)]
305 location: Location,
306 source: servers::error::Error,
307 },
308
309 #[snafu(display("Failed to parse address {}", addr))]
310 ParseAddr {
311 addr: String,
312 #[snafu(source)]
313 error: std::net::AddrParseError,
314 },
315
316 #[snafu(display("Invalid lease key: {}", key))]
317 InvalidLeaseKey {
318 key: String,
319 #[snafu(implicit)]
320 location: Location,
321 },
322
323 #[snafu(display("Invalid datanode stat key: {}", key))]
324 InvalidStatKey {
325 key: String,
326 #[snafu(implicit)]
327 location: Location,
328 },
329
330 #[snafu(display("Invalid inactive region key: {}", key))]
331 InvalidInactiveRegionKey {
332 key: String,
333 #[snafu(implicit)]
334 location: Location,
335 },
336
337 #[snafu(display("Failed to parse lease key from utf8"))]
338 LeaseKeyFromUtf8 {
339 #[snafu(source)]
340 error: std::string::FromUtf8Error,
341 #[snafu(implicit)]
342 location: Location,
343 },
344
345 #[snafu(display("Failed to parse lease value from utf8"))]
346 LeaseValueFromUtf8 {
347 #[snafu(source)]
348 error: std::string::FromUtf8Error,
349 #[snafu(implicit)]
350 location: Location,
351 },
352
353 #[snafu(display("Failed to parse invalid region key from utf8"))]
354 InvalidRegionKeyFromUtf8 {
355 #[snafu(source)]
356 error: std::string::FromUtf8Error,
357 #[snafu(implicit)]
358 location: Location,
359 },
360
361 #[snafu(display("Failed to serialize to json: {}", input))]
362 SerializeToJson {
363 input: String,
364 #[snafu(source)]
365 error: serde_json::error::Error,
366 #[snafu(implicit)]
367 location: Location,
368 },
369
370 #[snafu(display("Failed to deserialize from json: {}", input))]
371 DeserializeFromJson {
372 input: String,
373 #[snafu(source)]
374 error: serde_json::error::Error,
375 #[snafu(implicit)]
376 location: Location,
377 },
378
379 #[snafu(display("Failed to parse number: {}", err_msg))]
380 ParseNum {
381 err_msg: String,
382 #[snafu(source)]
383 error: std::num::ParseIntError,
384 #[snafu(implicit)]
385 location: Location,
386 },
387
388 #[snafu(display("Failed to parse bool: {}", err_msg))]
389 ParseBool {
390 err_msg: String,
391 #[snafu(source)]
392 error: std::str::ParseBoolError,
393 #[snafu(implicit)]
394 location: Location,
395 },
396
397 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
398 DowngradeLeader {
399 region_id: RegionId,
400 #[snafu(implicit)]
401 location: Location,
402 #[snafu(source)]
403 source: BoxedError,
404 },
405
406 #[snafu(display("Region's leader peer changed: {}", msg))]
407 LeaderPeerChanged {
408 msg: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Invalid arguments: {}", err_msg))]
414 InvalidArguments {
415 err_msg: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[cfg(feature = "mysql_kvbackend")]
421 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
422 ParseMySqlUrl {
423 #[snafu(source)]
424 error: sqlx::error::Error,
425 mysql_url: String,
426 #[snafu(implicit)]
427 location: Location,
428 },
429
430 #[cfg(feature = "mysql_kvbackend")]
431 #[snafu(display("Failed to decode sql value"))]
432 DecodeSqlValue {
433 #[snafu(source)]
434 error: sqlx::error::Error,
435 #[snafu(implicit)]
436 location: Location,
437 },
438
439 #[snafu(display("Failed to find table route for {table_id}"))]
440 TableRouteNotFound {
441 table_id: TableId,
442 #[snafu(implicit)]
443 location: Location,
444 },
445
446 #[snafu(display("Failed to find table route for {region_id}"))]
447 RegionRouteNotFound {
448 region_id: RegionId,
449 #[snafu(implicit)]
450 location: Location,
451 },
452
453 #[snafu(display("Table info not found: {}", table_id))]
454 TableInfoNotFound {
455 table_id: TableId,
456 #[snafu(implicit)]
457 location: Location,
458 },
459
460 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
461 DatanodeTableNotFound {
462 table_id: TableId,
463 datanode_id: DatanodeId,
464 #[snafu(implicit)]
465 location: Location,
466 },
467
468 #[snafu(display("Metasrv has no leader at this moment"))]
469 NoLeader {
470 #[snafu(implicit)]
471 location: Location,
472 },
473
474 #[snafu(display("Leader lease expired"))]
475 LeaderLeaseExpired {
476 #[snafu(implicit)]
477 location: Location,
478 },
479
480 #[snafu(display("Leader lease changed during election"))]
481 LeaderLeaseChanged {
482 #[snafu(implicit)]
483 location: Location,
484 },
485
486 #[snafu(display("Table {} not found", name))]
487 TableNotFound {
488 name: String,
489 #[snafu(implicit)]
490 location: Location,
491 },
492
493 #[snafu(display("Unsupported selector type, {}", selector_type))]
494 UnsupportedSelectorType {
495 selector_type: String,
496 #[snafu(implicit)]
497 location: Location,
498 },
499
500 #[snafu(display("Unexpected, violated: {violated}"))]
501 Unexpected {
502 violated: String,
503 #[snafu(implicit)]
504 location: Location,
505 },
506
507 #[snafu(display("Failed to create gRPC channel"))]
508 CreateChannel {
509 #[snafu(implicit)]
510 location: Location,
511 source: common_grpc::error::Error,
512 },
513
514 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
515 BatchGet {
516 #[snafu(source)]
517 error: tonic::Status,
518 #[snafu(implicit)]
519 location: Location,
520 },
521
522 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
523 Range {
524 #[snafu(source)]
525 error: tonic::Status,
526 #[snafu(implicit)]
527 location: Location,
528 },
529
530 #[snafu(display("Response header not found"))]
531 ResponseHeaderNotFound {
532 #[snafu(implicit)]
533 location: Location,
534 },
535
536 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
537 IsNotLeader {
538 node_addr: String,
539 #[snafu(implicit)]
540 location: Location,
541 },
542
543 #[snafu(display("Invalid http body"))]
544 InvalidHttpBody {
545 #[snafu(source)]
546 error: http::Error,
547 #[snafu(implicit)]
548 location: Location,
549 },
550
551 #[snafu(display(
552 "The number of retries for the grpc call {} exceeded the limit, {}",
553 func_name,
554 retry_num
555 ))]
556 ExceededRetryLimit {
557 func_name: String,
558 retry_num: usize,
559 #[snafu(implicit)]
560 location: Location,
561 },
562
563 #[snafu(display("Invalid utf-8 value"))]
564 InvalidUtf8Value {
565 #[snafu(source)]
566 error: std::string::FromUtf8Error,
567 #[snafu(implicit)]
568 location: Location,
569 },
570
571 #[snafu(display("Missing required parameter, param: {:?}", param))]
572 MissingRequiredParameter { param: String },
573
574 #[snafu(display("Failed to start procedure manager"))]
575 StartProcedureManager {
576 #[snafu(implicit)]
577 location: Location,
578 source: common_procedure::Error,
579 },
580
581 #[snafu(display("Failed to stop procedure manager"))]
582 StopProcedureManager {
583 #[snafu(implicit)]
584 location: Location,
585 source: common_procedure::Error,
586 },
587
588 #[snafu(display("Failed to wait procedure done"))]
589 WaitProcedure {
590 #[snafu(implicit)]
591 location: Location,
592 source: common_procedure::Error,
593 },
594
595 #[snafu(display("Failed to query procedure state"))]
596 QueryProcedure {
597 #[snafu(implicit)]
598 location: Location,
599 source: common_procedure::Error,
600 },
601
602 #[snafu(display("Procedure not found: {pid}"))]
603 ProcedureNotFound {
604 #[snafu(implicit)]
605 location: Location,
606 pid: String,
607 },
608
609 #[snafu(display("Failed to submit procedure"))]
610 SubmitProcedure {
611 #[snafu(implicit)]
612 location: Location,
613 source: common_procedure::Error,
614 },
615
616 #[snafu(display("A prune task for topic {} is already running", topic))]
617 PruneTaskAlreadyRunning {
618 topic: String,
619 #[snafu(implicit)]
620 location: Location,
621 },
622
623 #[snafu(display("Schema already exists, name: {schema_name}"))]
624 SchemaAlreadyExists {
625 schema_name: String,
626 #[snafu(implicit)]
627 location: Location,
628 },
629
630 #[snafu(display("Table already exists: {table_name}"))]
631 TableAlreadyExists {
632 table_name: String,
633 #[snafu(implicit)]
634 location: Location,
635 },
636
637 #[snafu(display("Pusher not found: {pusher_id}"))]
638 PusherNotFound {
639 pusher_id: String,
640 #[snafu(implicit)]
641 location: Location,
642 },
643
644 #[snafu(display("Failed to push message: {err_msg}"))]
645 PushMessage {
646 err_msg: String,
647 #[snafu(implicit)]
648 location: Location,
649 },
650
651 #[snafu(display("Mailbox already closed: {id}"))]
652 MailboxClosed {
653 id: u64,
654 #[snafu(implicit)]
655 location: Location,
656 },
657
658 #[snafu(display("Mailbox timeout: {id}"))]
659 MailboxTimeout {
660 id: u64,
661 #[snafu(implicit)]
662 location: Location,
663 },
664
665 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
666 MailboxReceiver {
667 id: u64,
668 err_msg: String,
669 #[snafu(implicit)]
670 location: Location,
671 },
672
673 #[snafu(display("Mailbox channel closed: {channel}"))]
674 MailboxChannelClosed {
675 channel: Channel,
676 #[snafu(implicit)]
677 location: Location,
678 },
679
680 #[snafu(display("Missing request header"))]
681 MissingRequestHeader {
682 #[snafu(implicit)]
683 location: Location,
684 },
685
686 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
687 RegisterProcedureLoader {
688 type_name: String,
689 #[snafu(implicit)]
690 location: Location,
691 source: common_procedure::error::Error,
692 },
693
694 #[snafu(display(
695 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
696 mailbox_message,
697 reason
698 ))]
699 UnexpectedInstructionReply {
700 mailbox_message: String,
701 reason: String,
702 #[snafu(implicit)]
703 location: Location,
704 },
705
706 #[snafu(display("Expected to retry later, reason: {}", reason))]
707 RetryLater {
708 reason: String,
709 #[snafu(implicit)]
710 location: Location,
711 },
712
713 #[snafu(display("Expected to retry later, reason: {}", reason))]
714 RetryLaterWithSource {
715 reason: String,
716 #[snafu(implicit)]
717 location: Location,
718 source: BoxedError,
719 },
720
721 #[snafu(display("Failed to convert proto data"))]
722 ConvertProtoData {
723 #[snafu(implicit)]
724 location: Location,
725 source: common_meta::error::Error,
726 },
727
728 #[snafu(display("Other error"))]
731 Other {
732 source: BoxedError,
733 #[snafu(implicit)]
734 location: Location,
735 },
736
737 #[snafu(display("Table metadata manager error"))]
738 TableMetadataManager {
739 source: common_meta::error::Error,
740 #[snafu(implicit)]
741 location: Location,
742 },
743
744 #[snafu(display("Runtime switch manager error"))]
745 RuntimeSwitchManager {
746 source: common_meta::error::Error,
747 #[snafu(implicit)]
748 location: Location,
749 },
750
751 #[snafu(display("Keyvalue backend error"))]
752 KvBackend {
753 source: common_meta::error::Error,
754 #[snafu(implicit)]
755 location: Location,
756 },
757
758 #[snafu(display("Failed to publish message"))]
759 PublishMessage {
760 #[snafu(source)]
761 error: SendError<Message>,
762 #[snafu(implicit)]
763 location: Location,
764 },
765
766 #[snafu(display("Too many partitions"))]
767 TooManyPartitions {
768 #[snafu(implicit)]
769 location: Location,
770 },
771
772 #[snafu(display("Failed to create repartition subtasks"))]
773 RepartitionCreateSubtasks {
774 source: partition::error::Error,
775 #[snafu(implicit)]
776 location: Location,
777 },
778
779 #[snafu(display(
780 "Source partition expression '{}' does not match any existing region",
781 expr
782 ))]
783 RepartitionSourceExprMismatch {
784 expr: String,
785 #[snafu(implicit)]
786 location: Location,
787 },
788
789 #[snafu(display(
790 "Failed to get the state receiver for repartition subprocedure {}",
791 procedure_id
792 ))]
793 RepartitionSubprocedureStateReceiver {
794 procedure_id: ProcedureId,
795 #[snafu(source)]
796 source: common_procedure::Error,
797 #[snafu(implicit)]
798 location: Location,
799 },
800
801 #[snafu(display("Unsupported operation {}", operation))]
802 Unsupported {
803 operation: String,
804 #[snafu(implicit)]
805 location: Location,
806 },
807
808 #[snafu(display("Unexpected table route type: {}", err_msg))]
809 UnexpectedLogicalRouteTable {
810 #[snafu(implicit)]
811 location: Location,
812 err_msg: String,
813 source: common_meta::error::Error,
814 },
815
816 #[snafu(display("Failed to save cluster info"))]
817 SaveClusterInfo {
818 #[snafu(implicit)]
819 location: Location,
820 source: common_meta::error::Error,
821 },
822
823 #[snafu(display("Invalid cluster info format"))]
824 InvalidClusterInfoFormat {
825 #[snafu(implicit)]
826 location: Location,
827 source: common_meta::error::Error,
828 },
829
830 #[snafu(display("Invalid datanode stat format"))]
831 InvalidDatanodeStatFormat {
832 #[snafu(implicit)]
833 location: Location,
834 source: common_meta::error::Error,
835 },
836
837 #[snafu(display("Invalid node info format"))]
838 InvalidNodeInfoFormat {
839 #[snafu(implicit)]
840 location: Location,
841 source: common_meta::error::Error,
842 },
843
844 #[snafu(display("Failed to serialize options to TOML"))]
845 TomlFormat {
846 #[snafu(implicit)]
847 location: Location,
848 #[snafu(source(from(common_config::error::Error, Box::new)))]
849 source: Box<common_config::error::Error>,
850 },
851
852 #[cfg(feature = "pg_kvbackend")]
853 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
854 PostgresExecution {
855 #[snafu(source)]
856 error: tokio_postgres::Error,
857 sql: String,
858 #[snafu(implicit)]
859 location: Location,
860 },
861
862 #[cfg(feature = "pg_kvbackend")]
863 #[snafu(display("Failed to get Postgres client"))]
864 GetPostgresClient {
865 #[snafu(implicit)]
866 location: Location,
867 #[snafu(source)]
868 error: deadpool::managed::PoolError<tokio_postgres::Error>,
869 },
870
871 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
872 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
873 SqlExecutionTimeout {
874 #[snafu(implicit)]
875 location: Location,
876 sql: String,
877 duration: std::time::Duration,
878 },
879
880 #[cfg(feature = "pg_kvbackend")]
881 #[snafu(display("Failed to create connection pool for Postgres"))]
882 CreatePostgresPool {
883 #[snafu(source)]
884 error: deadpool_postgres::CreatePoolError,
885 #[snafu(implicit)]
886 location: Location,
887 },
888
889 #[cfg(feature = "pg_kvbackend")]
890 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
891 GetPostgresConnection {
892 reason: String,
893 #[snafu(implicit)]
894 location: Location,
895 },
896
897 #[cfg(feature = "mysql_kvbackend")]
898 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
899 MySqlExecution {
900 #[snafu(source)]
901 error: sqlx::Error,
902 #[snafu(implicit)]
903 location: Location,
904 sql: String,
905 },
906
907 #[cfg(feature = "mysql_kvbackend")]
908 #[snafu(display("Failed to create mysql pool"))]
909 CreateMySqlPool {
910 #[snafu(source)]
911 error: sqlx::Error,
912 #[snafu(implicit)]
913 location: Location,
914 },
915
916 #[cfg(feature = "mysql_kvbackend")]
917 #[snafu(display("Failed to acquire mysql client from pool"))]
918 AcquireMySqlClient {
919 #[snafu(source)]
920 error: sqlx::Error,
921 #[snafu(implicit)]
922 location: Location,
923 },
924
925 #[snafu(display("Handler not found: {}", name))]
926 HandlerNotFound {
927 name: String,
928 #[snafu(implicit)]
929 location: Location,
930 },
931
932 #[snafu(display("Flow state handler error"))]
933 FlowStateHandler {
934 #[snafu(implicit)]
935 location: Location,
936 source: common_meta::error::Error,
937 },
938
939 #[snafu(display("Failed to build wal provider"))]
940 BuildWalProvider {
941 #[snafu(implicit)]
942 location: Location,
943 source: common_meta::error::Error,
944 },
945
946 #[snafu(display("Failed to parse wal options"))]
947 ParseWalOptions {
948 #[snafu(implicit)]
949 location: Location,
950 source: common_meta::error::Error,
951 },
952
953 #[snafu(display("Failed to build kafka client."))]
954 BuildKafkaClient {
955 #[snafu(implicit)]
956 location: Location,
957 #[snafu(source)]
958 error: common_meta::error::Error,
959 },
960
961 #[snafu(display(
962 "Failed to build a Kafka partition client, topic: {}, partition: {}",
963 topic,
964 partition
965 ))]
966 BuildPartitionClient {
967 topic: String,
968 partition: i32,
969 #[snafu(implicit)]
970 location: Location,
971 #[snafu(source)]
972 error: rskafka::client::error::Error,
973 },
974
975 #[snafu(display(
976 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
977 topic,
978 partition,
979 offset
980 ))]
981 DeleteRecords {
982 #[snafu(implicit)]
983 location: Location,
984 #[snafu(source)]
985 error: rskafka::client::error::Error,
986 topic: String,
987 partition: i32,
988 offset: u64,
989 },
990
991 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
992 GetOffset {
993 topic: String,
994 #[snafu(implicit)]
995 location: Location,
996 #[snafu(source)]
997 error: rskafka::client::error::Error,
998 },
999
1000 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1001 UpdateTopicNameValue {
1002 topic: String,
1003 #[snafu(implicit)]
1004 location: Location,
1005 #[snafu(source)]
1006 source: common_meta::error::Error,
1007 },
1008
1009 #[snafu(display("Failed to build tls options"))]
1010 BuildTlsOptions {
1011 #[snafu(implicit)]
1012 location: Location,
1013 #[snafu(source)]
1014 source: common_meta::error::Error,
1015 },
1016
1017 #[snafu(display(
1018 "Repartition group {} source region missing, region id: {}",
1019 group_id,
1020 region_id
1021 ))]
1022 RepartitionSourceRegionMissing {
1023 group_id: Uuid,
1024 region_id: RegionId,
1025 #[snafu(implicit)]
1026 location: Location,
1027 },
1028
1029 #[snafu(display(
1030 "Repartition group {} target region missing, region id: {}",
1031 group_id,
1032 region_id
1033 ))]
1034 RepartitionTargetRegionMissing {
1035 group_id: Uuid,
1036 region_id: RegionId,
1037 #[snafu(implicit)]
1038 location: Location,
1039 },
1040
1041 #[snafu(display("Failed to serialize partition expression"))]
1042 SerializePartitionExpr {
1043 #[snafu(source)]
1044 source: partition::error::Error,
1045 #[snafu(implicit)]
1046 location: Location,
1047 },
1048
1049 #[snafu(display("Failed to deserialize partition expression"))]
1050 DeserializePartitionExpr {
1051 #[snafu(source)]
1052 source: partition::error::Error,
1053 #[snafu(implicit)]
1054 location: Location,
1055 },
1056
1057 #[snafu(display("Empty partition expression"))]
1058 EmptyPartitionExpr {
1059 #[snafu(implicit)]
1060 location: Location,
1061 },
1062
1063 #[snafu(display(
1064 "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1065 region_id,
1066 expected,
1067 actual
1068 ))]
1069 PartitionExprMismatch {
1070 region_id: RegionId,
1071 expected: String,
1072 actual: String,
1073 #[snafu(implicit)]
1074 location: Location,
1075 },
1076
1077 #[snafu(display("Failed to allocate regions for table: {}", table_id))]
1078 AllocateRegions {
1079 #[snafu(implicit)]
1080 location: Location,
1081 table_id: TableId,
1082 #[snafu(source)]
1083 source: common_meta::error::Error,
1084 },
1085
1086 #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1087 DeallocateRegions {
1088 #[snafu(implicit)]
1089 location: Location,
1090 table_id: TableId,
1091 #[snafu(source)]
1092 source: common_meta::error::Error,
1093 },
1094
1095 #[snafu(display("Failed to build create request for table: {}", table_id))]
1096 BuildCreateRequest {
1097 #[snafu(implicit)]
1098 location: Location,
1099 table_id: TableId,
1100 #[snafu(source)]
1101 source: common_meta::error::Error,
1102 },
1103
1104 #[snafu(display("Failed to allocate region routes for table: {}", table_id))]
1105 AllocateRegionRoutes {
1106 #[snafu(implicit)]
1107 location: Location,
1108 table_id: TableId,
1109 #[snafu(source)]
1110 source: common_meta::error::Error,
1111 },
1112
1113 #[snafu(display("Failed to allocate wal options for table: {}", table_id))]
1114 AllocateWalOptions {
1115 #[snafu(implicit)]
1116 location: Location,
1117 table_id: TableId,
1118 #[snafu(source)]
1119 source: common_meta::error::Error,
1120 },
1121}
1122
1123impl Error {
1124 pub fn is_retryable(&self) -> bool {
1126 matches!(
1127 self,
1128 Error::RetryLater { .. }
1129 | Error::RetryLaterWithSource { .. }
1130 | Error::MailboxTimeout { .. }
1131 )
1132 }
1133}
1134
1135pub type Result<T> = std::result::Result<T, Error>;
1136
1137define_into_tonic_status!(Error);
1138
1139impl ErrorExt for Error {
1140 fn status_code(&self) -> StatusCode {
1141 match self {
1142 Error::EtcdFailed { .. }
1143 | Error::ConnectEtcd { .. }
1144 | Error::FileIo { .. }
1145 | Error::TcpBind { .. }
1146 | Error::SerializeToJson { .. }
1147 | Error::DeserializeFromJson { .. }
1148 | Error::NoLeader { .. }
1149 | Error::LeaderLeaseExpired { .. }
1150 | Error::LeaderLeaseChanged { .. }
1151 | Error::CreateChannel { .. }
1152 | Error::BatchGet { .. }
1153 | Error::Range { .. }
1154 | Error::ResponseHeaderNotFound { .. }
1155 | Error::InvalidHttpBody { .. }
1156 | Error::ExceededRetryLimit { .. }
1157 | Error::SendShutdownSignal { .. }
1158 | Error::PushMessage { .. }
1159 | Error::MailboxClosed { .. }
1160 | Error::MailboxReceiver { .. }
1161 | Error::StartGrpc { .. }
1162 | Error::PublishMessage { .. }
1163 | Error::Join { .. }
1164 | Error::ChooseItems { .. }
1165 | Error::FlowStateHandler { .. }
1166 | Error::BuildWalProvider { .. }
1167 | Error::BuildPartitionClient { .. }
1168 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1169
1170 Error::DeleteRecords { .. }
1171 | Error::GetOffset { .. }
1172 | Error::PeerUnavailable { .. }
1173 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1174 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1175 Error::PruneTaskAlreadyRunning { .. }
1176 | Error::RetryLater { .. }
1177 | Error::MailboxChannelClosed { .. }
1178 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1179 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1180 Error::SerializePartitionExpr { source, .. }
1181 | Error::DeserializePartitionExpr { source, .. } => source.status_code(),
1182
1183 Error::Unsupported { .. } => StatusCode::Unsupported,
1184
1185 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1186
1187 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1188 Error::EmptyKey { .. }
1189 | Error::MissingRequiredParameter { .. }
1190 | Error::MissingRequestHeader { .. }
1191 | Error::InvalidLeaseKey { .. }
1192 | Error::InvalidStatKey { .. }
1193 | Error::InvalidInactiveRegionKey { .. }
1194 | Error::ParseNum { .. }
1195 | Error::ParseBool { .. }
1196 | Error::ParseAddr { .. }
1197 | Error::UnsupportedSelectorType { .. }
1198 | Error::InvalidArguments { .. }
1199 | Error::ProcedureNotFound { .. }
1200 | Error::TooManyPartitions { .. }
1201 | Error::TomlFormat { .. }
1202 | Error::HandlerNotFound { .. }
1203 | Error::LeaderPeerChanged { .. }
1204 | Error::RepartitionSourceRegionMissing { .. }
1205 | Error::RepartitionTargetRegionMissing { .. }
1206 | Error::PartitionExprMismatch { .. }
1207 | Error::RepartitionSourceExprMismatch { .. }
1208 | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
1209 Error::LeaseKeyFromUtf8 { .. }
1210 | Error::LeaseValueFromUtf8 { .. }
1211 | Error::InvalidRegionKeyFromUtf8 { .. }
1212 | Error::TableRouteNotFound { .. }
1213 | Error::TableInfoNotFound { .. }
1214 | Error::DatanodeTableNotFound { .. }
1215 | Error::InvalidUtf8Value { .. }
1216 | Error::UnexpectedInstructionReply { .. }
1217 | Error::Unexpected { .. }
1218 | Error::RegionOperatingRace { .. }
1219 | Error::RegionRouteNotFound { .. }
1220 | Error::MigrationAbort { .. }
1221 | Error::MigrationRunning { .. }
1222 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1223 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1224 Error::SaveClusterInfo { source, .. }
1225 | Error::InvalidClusterInfoFormat { source, .. }
1226 | Error::InvalidDatanodeStatFormat { source, .. }
1227 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1228 Error::InvalidateTableCache { source, .. } => source.status_code(),
1229 Error::SubmitProcedure { source, .. }
1230 | Error::WaitProcedure { source, .. }
1231 | Error::QueryProcedure { source, .. } => source.status_code(),
1232 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1233 source.status_code()
1234 }
1235 Error::StartProcedureManager { source, .. }
1236 | Error::StopProcedureManager { source, .. } => source.status_code(),
1237
1238 Error::ListCatalogs { source, .. }
1239 | Error::ListSchemas { source, .. }
1240 | Error::ListTables { source, .. } => source.status_code(),
1241 Error::StartTelemetryTask { source, .. } => source.status_code(),
1242
1243 Error::NextSequence { source, .. }
1244 | Error::SetNextSequence { source, .. }
1245 | Error::PeekSequence { source, .. } => source.status_code(),
1246 Error::DowngradeLeader { source, .. } => source.status_code(),
1247 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1248 Error::SubmitDdlTask { source, .. }
1249 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1250 Error::ConvertProtoData { source, .. }
1251 | Error::TableMetadataManager { source, .. }
1252 | Error::RuntimeSwitchManager { source, .. }
1253 | Error::KvBackend { source, .. }
1254 | Error::UnexpectedLogicalRouteTable { source, .. }
1255 | Error::UpdateTopicNameValue { source, .. }
1256 | Error::ParseWalOptions { source, .. } => source.status_code(),
1257 Error::ListActiveFrontends { source, .. }
1258 | Error::ListActiveDatanodes { source, .. }
1259 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1260 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1261
1262 Error::InitMetadata { source, .. }
1263 | Error::InitDdlManager { source, .. }
1264 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1265
1266 Error::BuildTlsOptions { source, .. } => source.status_code(),
1267 Error::Other { source, .. } => source.status_code(),
1268 Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1269 Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1270 Error::AllocateRegions { source, .. } => source.status_code(),
1271 Error::DeallocateRegions { source, .. } => source.status_code(),
1272 Error::AllocateRegionRoutes { source, .. } => source.status_code(),
1273 Error::AllocateWalOptions { source, .. } => source.status_code(),
1274 Error::BuildCreateRequest { source, .. } => source.status_code(),
1275 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1276
1277 #[cfg(feature = "pg_kvbackend")]
1278 Error::CreatePostgresPool { .. }
1279 | Error::GetPostgresClient { .. }
1280 | Error::GetPostgresConnection { .. }
1281 | Error::PostgresExecution { .. } => StatusCode::Internal,
1282 #[cfg(feature = "mysql_kvbackend")]
1283 Error::MySqlExecution { .. }
1284 | Error::CreateMySqlPool { .. }
1285 | Error::ParseMySqlUrl { .. }
1286 | Error::DecodeSqlValue { .. }
1287 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1288 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1289 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1290 }
1291 }
1292
1293 fn as_any(&self) -> &dyn std::any::Any {
1294 self
1295 }
1296}
1297
1298pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1300 let mut err: &(dyn std::error::Error + 'static) = err_status;
1301
1302 loop {
1303 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1304 return Some(io_err);
1305 }
1306
1307 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1310 && let Some(io_err) = h2_err.get_io()
1311 {
1312 return Some(io_err);
1313 }
1314
1315 err = err.source()?;
1316 }
1317}