1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::metadata::TableId;
25use tokio::sync::mpsc::error::SendError;
26use tonic::codegen::http;
27use uuid::Uuid;
28
29use crate::metasrv::SelectTarget;
30use crate::pubsub::Message;
31use crate::service::mailbox::Channel;
32
33#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37 #[snafu(display("Failed to choose items"))]
38 ChooseItems {
39 #[snafu(implicit)]
40 location: Location,
41 #[snafu(source)]
42 error: rand::distr::weighted::Error,
43 },
44
45 #[snafu(display("Exceeded deadline, operation: {}", operation))]
46 ExceededDeadline {
47 #[snafu(implicit)]
48 location: Location,
49 operation: String,
50 },
51
52 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
53 PeerUnavailable {
54 #[snafu(implicit)]
55 location: Location,
56 peer_id: u64,
57 },
58
59 #[snafu(display("Failed to list active frontends"))]
60 ListActiveFrontends {
61 #[snafu(implicit)]
62 location: Location,
63 source: common_meta::error::Error,
64 },
65
66 #[snafu(display("Failed to list active datanodes"))]
67 ListActiveDatanodes {
68 #[snafu(implicit)]
69 location: Location,
70 source: common_meta::error::Error,
71 },
72
73 #[snafu(display("Failed to list active flownodes"))]
74 ListActiveFlownodes {
75 #[snafu(implicit)]
76 location: Location,
77 source: common_meta::error::Error,
78 },
79
80 #[snafu(display("No available frontend"))]
81 NoAvailableFrontend {
82 #[snafu(implicit)]
83 location: Location,
84 },
85
86 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
87 MigrationRunning {
88 #[snafu(implicit)]
89 location: Location,
90 region_id: RegionId,
91 },
92
93 #[snafu(display(
94 "The region migration procedure is completed for region: {}, target_peer: {}",
95 region_id,
96 target_peer_id
97 ))]
98 RegionMigrated {
99 #[snafu(implicit)]
100 location: Location,
101 region_id: RegionId,
102 target_peer_id: u64,
103 },
104
105 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
106 MigrationAbort {
107 #[snafu(implicit)]
108 location: Location,
109 reason: String,
110 },
111
112 #[snafu(display(
113 "Another procedure is operating the region: {} on peer: {}",
114 region_id,
115 peer_id
116 ))]
117 RegionOperatingRace {
118 #[snafu(implicit)]
119 location: Location,
120 peer_id: DatanodeId,
121 region_id: RegionId,
122 },
123
124 #[snafu(display("Failed to init ddl manager"))]
125 InitDdlManager {
126 #[snafu(implicit)]
127 location: Location,
128 source: common_meta::error::Error,
129 },
130
131 #[snafu(display("Failed to init reconciliation manager"))]
132 InitReconciliationManager {
133 #[snafu(implicit)]
134 location: Location,
135 source: common_meta::error::Error,
136 },
137
138 #[snafu(display("Failed to create default catalog and schema"))]
139 InitMetadata {
140 #[snafu(implicit)]
141 location: Location,
142 source: common_meta::error::Error,
143 },
144
145 #[snafu(display("Failed to allocate next sequence number"))]
146 NextSequence {
147 #[snafu(implicit)]
148 location: Location,
149 source: common_meta::error::Error,
150 },
151
152 #[snafu(display("Failed to set next sequence number"))]
153 SetNextSequence {
154 #[snafu(implicit)]
155 location: Location,
156 source: common_meta::error::Error,
157 },
158
159 #[snafu(display("Failed to peek sequence number"))]
160 PeekSequence {
161 #[snafu(implicit)]
162 location: Location,
163 source: common_meta::error::Error,
164 },
165
166 #[snafu(display("Failed to start telemetry task"))]
167 StartTelemetryTask {
168 #[snafu(implicit)]
169 location: Location,
170 source: common_runtime::error::Error,
171 },
172
173 #[snafu(display("Failed to submit ddl task"))]
174 SubmitDdlTask {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_meta::error::Error,
178 },
179
180 #[snafu(display("Failed to submit reconcile procedure"))]
181 SubmitReconcileProcedure {
182 #[snafu(implicit)]
183 location: Location,
184 source: common_meta::error::Error,
185 },
186
187 #[snafu(display("Failed to invalidate table cache"))]
188 InvalidateTableCache {
189 #[snafu(implicit)]
190 location: Location,
191 source: common_meta::error::Error,
192 },
193
194 #[snafu(display("Failed to list catalogs"))]
195 ListCatalogs {
196 #[snafu(implicit)]
197 location: Location,
198 source: BoxedError,
199 },
200
201 #[snafu(display("Failed to list {}'s schemas", catalog))]
202 ListSchemas {
203 #[snafu(implicit)]
204 location: Location,
205 catalog: String,
206 source: BoxedError,
207 },
208
209 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
210 ListTables {
211 #[snafu(implicit)]
212 location: Location,
213 catalog: String,
214 schema: String,
215 source: BoxedError,
216 },
217
218 #[snafu(display("Failed to join a future"))]
219 Join {
220 #[snafu(implicit)]
221 location: Location,
222 #[snafu(source)]
223 error: JoinError,
224 },
225
226 #[snafu(display(
227 "Failed to request {}, required: {}, but only {} available",
228 select_target,
229 required,
230 available
231 ))]
232 NoEnoughAvailableNode {
233 #[snafu(implicit)]
234 location: Location,
235 required: usize,
236 available: usize,
237 select_target: SelectTarget,
238 },
239
240 #[snafu(display("Failed to send shutdown signal"))]
241 SendShutdownSignal {
242 #[snafu(source)]
243 error: SendError<()>,
244 },
245
246 #[snafu(display("Failed to shutdown {} server", server))]
247 ShutdownServer {
248 #[snafu(implicit)]
249 location: Location,
250 source: servers::error::Error,
251 server: String,
252 },
253
254 #[snafu(display("Empty key is not allowed"))]
255 EmptyKey {
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to execute via Etcd"))]
261 EtcdFailed {
262 #[snafu(source)]
263 error: etcd_client::Error,
264 #[snafu(implicit)]
265 location: Location,
266 },
267
268 #[snafu(display("Failed to connect to Etcd"))]
269 ConnectEtcd {
270 #[snafu(source)]
271 error: etcd_client::Error,
272 #[snafu(implicit)]
273 location: Location,
274 },
275
276 #[snafu(display("Failed to read file: {}", path))]
277 FileIo {
278 #[snafu(source)]
279 error: std::io::Error,
280 #[snafu(implicit)]
281 location: Location,
282 path: String,
283 },
284
285 #[snafu(display("Failed to bind address {}", addr))]
286 TcpBind {
287 addr: String,
288 #[snafu(source)]
289 error: std::io::Error,
290 #[snafu(implicit)]
291 location: Location,
292 },
293
294 #[snafu(display("Failed to start gRPC server"))]
295 StartGrpc {
296 #[snafu(source)]
297 error: tonic::transport::Error,
298 #[snafu(implicit)]
299 location: Location,
300 },
301
302 #[snafu(display("Failed to start http server"))]
303 StartHttp {
304 #[snafu(implicit)]
305 location: Location,
306 source: servers::error::Error,
307 },
308
309 #[snafu(display("Failed to parse address {}", addr))]
310 ParseAddr {
311 addr: String,
312 #[snafu(source)]
313 error: std::net::AddrParseError,
314 },
315
316 #[snafu(display("Invalid lease key: {}", key))]
317 InvalidLeaseKey {
318 key: String,
319 #[snafu(implicit)]
320 location: Location,
321 },
322
323 #[snafu(display("Invalid datanode stat key: {}", key))]
324 InvalidStatKey {
325 key: String,
326 #[snafu(implicit)]
327 location: Location,
328 },
329
330 #[snafu(display("Invalid inactive region key: {}", key))]
331 InvalidInactiveRegionKey {
332 key: String,
333 #[snafu(implicit)]
334 location: Location,
335 },
336
337 #[snafu(display("Failed to parse lease key from utf8"))]
338 LeaseKeyFromUtf8 {
339 #[snafu(source)]
340 error: std::string::FromUtf8Error,
341 #[snafu(implicit)]
342 location: Location,
343 },
344
345 #[snafu(display("Failed to parse lease value from utf8"))]
346 LeaseValueFromUtf8 {
347 #[snafu(source)]
348 error: std::string::FromUtf8Error,
349 #[snafu(implicit)]
350 location: Location,
351 },
352
353 #[snafu(display("Failed to parse invalid region key from utf8"))]
354 InvalidRegionKeyFromUtf8 {
355 #[snafu(source)]
356 error: std::string::FromUtf8Error,
357 #[snafu(implicit)]
358 location: Location,
359 },
360
361 #[snafu(display("Failed to serialize to json: {}", input))]
362 SerializeToJson {
363 input: String,
364 #[snafu(source)]
365 error: serde_json::error::Error,
366 #[snafu(implicit)]
367 location: Location,
368 },
369
370 #[snafu(display("Failed to deserialize from json: {}", input))]
371 DeserializeFromJson {
372 input: String,
373 #[snafu(source)]
374 error: serde_json::error::Error,
375 #[snafu(implicit)]
376 location: Location,
377 },
378
379 #[snafu(display("Failed to serialize config"))]
380 SerializeConfig {
381 #[snafu(source)]
382 error: serde_json::error::Error,
383 #[snafu(implicit)]
384 location: Location,
385 },
386
387 #[snafu(display("Failed to parse number: {}", err_msg))]
388 ParseNum {
389 err_msg: String,
390 #[snafu(source)]
391 error: std::num::ParseIntError,
392 #[snafu(implicit)]
393 location: Location,
394 },
395
396 #[snafu(display("Failed to parse bool: {}", err_msg))]
397 ParseBool {
398 err_msg: String,
399 #[snafu(source)]
400 error: std::str::ParseBoolError,
401 #[snafu(implicit)]
402 location: Location,
403 },
404
405 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
406 DowngradeLeader {
407 region_id: RegionId,
408 #[snafu(implicit)]
409 location: Location,
410 #[snafu(source)]
411 source: BoxedError,
412 },
413
414 #[snafu(display("Region's leader peer changed: {}", msg))]
415 LeaderPeerChanged {
416 msg: String,
417 #[snafu(implicit)]
418 location: Location,
419 },
420
421 #[snafu(display("Invalid arguments: {}", err_msg))]
422 InvalidArguments {
423 err_msg: String,
424 #[snafu(implicit)]
425 location: Location,
426 },
427
428 #[cfg(feature = "mysql_kvbackend")]
429 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
430 ParseMySqlUrl {
431 #[snafu(source)]
432 error: sqlx::error::Error,
433 mysql_url: String,
434 #[snafu(implicit)]
435 location: Location,
436 },
437
438 #[cfg(feature = "mysql_kvbackend")]
439 #[snafu(display("Failed to decode sql value"))]
440 DecodeSqlValue {
441 #[snafu(source)]
442 error: sqlx::error::Error,
443 #[snafu(implicit)]
444 location: Location,
445 },
446
447 #[snafu(display("Failed to find table route for {table_id}"))]
448 TableRouteNotFound {
449 table_id: TableId,
450 #[snafu(implicit)]
451 location: Location,
452 },
453
454 #[snafu(display("Failed to find table route for {region_id}"))]
455 RegionRouteNotFound {
456 region_id: RegionId,
457 #[snafu(implicit)]
458 location: Location,
459 },
460
461 #[snafu(display("Table info not found: {}", table_id))]
462 TableInfoNotFound {
463 table_id: TableId,
464 #[snafu(implicit)]
465 location: Location,
466 },
467
468 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
469 DatanodeTableNotFound {
470 table_id: TableId,
471 datanode_id: DatanodeId,
472 #[snafu(implicit)]
473 location: Location,
474 },
475
476 #[snafu(display("Metasrv has no leader at this moment"))]
477 NoLeader {
478 #[snafu(implicit)]
479 location: Location,
480 },
481
482 #[snafu(display("Leader lease expired"))]
483 LeaderLeaseExpired {
484 #[snafu(implicit)]
485 location: Location,
486 },
487
488 #[snafu(display("Leader lease changed during election"))]
489 LeaderLeaseChanged {
490 #[snafu(implicit)]
491 location: Location,
492 },
493
494 #[snafu(display("Table {} not found", name))]
495 TableNotFound {
496 name: String,
497 #[snafu(implicit)]
498 location: Location,
499 },
500
501 #[snafu(display("Unsupported selector type, {}", selector_type))]
502 UnsupportedSelectorType {
503 selector_type: String,
504 #[snafu(implicit)]
505 location: Location,
506 },
507
508 #[snafu(display("Unexpected, violated: {violated}"))]
509 Unexpected {
510 violated: String,
511 #[snafu(implicit)]
512 location: Location,
513 },
514
515 #[snafu(display("Failed to create gRPC channel"))]
516 CreateChannel {
517 #[snafu(implicit)]
518 location: Location,
519 source: common_grpc::error::Error,
520 },
521
522 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
523 BatchGet {
524 #[snafu(source)]
525 error: tonic::Status,
526 #[snafu(implicit)]
527 location: Location,
528 },
529
530 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
531 Range {
532 #[snafu(source)]
533 error: tonic::Status,
534 #[snafu(implicit)]
535 location: Location,
536 },
537
538 #[snafu(display("Response header not found"))]
539 ResponseHeaderNotFound {
540 #[snafu(implicit)]
541 location: Location,
542 },
543
544 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
545 IsNotLeader {
546 node_addr: String,
547 #[snafu(implicit)]
548 location: Location,
549 },
550
551 #[snafu(display("Invalid http body"))]
552 InvalidHttpBody {
553 #[snafu(source)]
554 error: http::Error,
555 #[snafu(implicit)]
556 location: Location,
557 },
558
559 #[snafu(display(
560 "The number of retries for the grpc call {} exceeded the limit, {}",
561 func_name,
562 retry_num
563 ))]
564 ExceededRetryLimit {
565 func_name: String,
566 retry_num: usize,
567 #[snafu(implicit)]
568 location: Location,
569 },
570
571 #[snafu(display("Invalid utf-8 value"))]
572 InvalidUtf8Value {
573 #[snafu(source)]
574 error: std::string::FromUtf8Error,
575 #[snafu(implicit)]
576 location: Location,
577 },
578
579 #[snafu(display("Missing required parameter, param: {:?}", param))]
580 MissingRequiredParameter { param: String },
581
582 #[snafu(display("Failed to start procedure manager"))]
583 StartProcedureManager {
584 #[snafu(implicit)]
585 location: Location,
586 source: common_procedure::Error,
587 },
588
589 #[snafu(display("Failed to stop procedure manager"))]
590 StopProcedureManager {
591 #[snafu(implicit)]
592 location: Location,
593 source: common_procedure::Error,
594 },
595
596 #[snafu(display("Failed to wait procedure done"))]
597 WaitProcedure {
598 #[snafu(implicit)]
599 location: Location,
600 source: common_procedure::Error,
601 },
602
603 #[snafu(display("Failed to query procedure state"))]
604 QueryProcedure {
605 #[snafu(implicit)]
606 location: Location,
607 source: common_procedure::Error,
608 },
609
610 #[snafu(display("Procedure not found: {pid}"))]
611 ProcedureNotFound {
612 #[snafu(implicit)]
613 location: Location,
614 pid: String,
615 },
616
617 #[snafu(display("Failed to submit procedure"))]
618 SubmitProcedure {
619 #[snafu(implicit)]
620 location: Location,
621 source: common_procedure::Error,
622 },
623
624 #[snafu(display("A prune task for topic {} is already running", topic))]
625 PruneTaskAlreadyRunning {
626 topic: String,
627 #[snafu(implicit)]
628 location: Location,
629 },
630
631 #[snafu(display("Schema already exists, name: {schema_name}"))]
632 SchemaAlreadyExists {
633 schema_name: String,
634 #[snafu(implicit)]
635 location: Location,
636 },
637
638 #[snafu(display("Table already exists: {table_name}"))]
639 TableAlreadyExists {
640 table_name: String,
641 #[snafu(implicit)]
642 location: Location,
643 },
644
645 #[snafu(display("Pusher not found: {pusher_id}"))]
646 PusherNotFound {
647 pusher_id: String,
648 #[snafu(implicit)]
649 location: Location,
650 },
651
652 #[snafu(display("Failed to push message: {err_msg}"))]
653 PushMessage {
654 err_msg: String,
655 #[snafu(implicit)]
656 location: Location,
657 },
658
659 #[snafu(display("Mailbox already closed: {id}"))]
660 MailboxClosed {
661 id: u64,
662 #[snafu(implicit)]
663 location: Location,
664 },
665
666 #[snafu(display("Mailbox timeout: {id}"))]
667 MailboxTimeout {
668 id: u64,
669 #[snafu(implicit)]
670 location: Location,
671 },
672
673 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
674 MailboxReceiver {
675 id: u64,
676 err_msg: String,
677 #[snafu(implicit)]
678 location: Location,
679 },
680
681 #[snafu(display("Mailbox channel closed: {channel}"))]
682 MailboxChannelClosed {
683 channel: Channel,
684 #[snafu(implicit)]
685 location: Location,
686 },
687
688 #[snafu(display("Missing request header"))]
689 MissingRequestHeader {
690 #[snafu(implicit)]
691 location: Location,
692 },
693
694 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
695 RegisterProcedureLoader {
696 type_name: String,
697 #[snafu(implicit)]
698 location: Location,
699 source: common_procedure::error::Error,
700 },
701
702 #[snafu(display(
703 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
704 mailbox_message,
705 reason
706 ))]
707 UnexpectedInstructionReply {
708 mailbox_message: String,
709 reason: String,
710 #[snafu(implicit)]
711 location: Location,
712 },
713
714 #[snafu(display("Expected to retry later, reason: {}", reason))]
715 RetryLater {
716 reason: String,
717 #[snafu(implicit)]
718 location: Location,
719 },
720
721 #[snafu(display("Expected to retry later, reason: {}", reason))]
722 RetryLaterWithSource {
723 reason: String,
724 #[snafu(implicit)]
725 location: Location,
726 source: BoxedError,
727 },
728
729 #[snafu(display("Failed to convert proto data"))]
730 ConvertProtoData {
731 #[snafu(implicit)]
732 location: Location,
733 source: common_meta::error::Error,
734 },
735
736 #[snafu(display("Other error"))]
739 Other {
740 source: BoxedError,
741 #[snafu(implicit)]
742 location: Location,
743 },
744
745 #[snafu(display("Table metadata manager error"))]
746 TableMetadataManager {
747 source: common_meta::error::Error,
748 #[snafu(implicit)]
749 location: Location,
750 },
751
752 #[snafu(display("Runtime switch manager error"))]
753 RuntimeSwitchManager {
754 source: common_meta::error::Error,
755 #[snafu(implicit)]
756 location: Location,
757 },
758
759 #[snafu(display("Keyvalue backend error"))]
760 KvBackend {
761 source: common_meta::error::Error,
762 #[snafu(implicit)]
763 location: Location,
764 },
765
766 #[snafu(display("Failed to publish message"))]
767 PublishMessage {
768 #[snafu(source)]
769 error: SendError<Message>,
770 #[snafu(implicit)]
771 location: Location,
772 },
773
774 #[snafu(display("Too many partitions"))]
775 TooManyPartitions {
776 #[snafu(implicit)]
777 location: Location,
778 },
779
780 #[snafu(display("Failed to create repartition subtasks"))]
781 RepartitionCreateSubtasks {
782 source: partition::error::Error,
783 #[snafu(implicit)]
784 location: Location,
785 },
786
787 #[snafu(display(
788 "Source partition expression '{}' does not match any existing region",
789 expr
790 ))]
791 RepartitionSourceExprMismatch {
792 expr: String,
793 #[snafu(implicit)]
794 location: Location,
795 },
796
797 #[snafu(display(
798 "Failed to get the state receiver for repartition subprocedure {}",
799 procedure_id
800 ))]
801 RepartitionSubprocedureStateReceiver {
802 procedure_id: ProcedureId,
803 #[snafu(source)]
804 source: common_procedure::Error,
805 #[snafu(implicit)]
806 location: Location,
807 },
808
809 #[snafu(display("Unsupported operation {}", operation))]
810 Unsupported {
811 operation: String,
812 #[snafu(implicit)]
813 location: Location,
814 },
815
816 #[snafu(display("Unexpected table route type: {}", err_msg))]
817 UnexpectedLogicalRouteTable {
818 #[snafu(implicit)]
819 location: Location,
820 err_msg: String,
821 source: common_meta::error::Error,
822 },
823
824 #[snafu(display("Failed to save cluster info"))]
825 SaveClusterInfo {
826 #[snafu(implicit)]
827 location: Location,
828 source: common_meta::error::Error,
829 },
830
831 #[snafu(display("Invalid cluster info format"))]
832 InvalidClusterInfoFormat {
833 #[snafu(implicit)]
834 location: Location,
835 source: common_meta::error::Error,
836 },
837
838 #[snafu(display("Invalid datanode stat format"))]
839 InvalidDatanodeStatFormat {
840 #[snafu(implicit)]
841 location: Location,
842 source: common_meta::error::Error,
843 },
844
845 #[snafu(display("Invalid node info format"))]
846 InvalidNodeInfoFormat {
847 #[snafu(implicit)]
848 location: Location,
849 source: common_meta::error::Error,
850 },
851
852 #[snafu(display("Failed to serialize options to TOML"))]
853 TomlFormat {
854 #[snafu(implicit)]
855 location: Location,
856 #[snafu(source(from(common_config::error::Error, Box::new)))]
857 source: Box<common_config::error::Error>,
858 },
859
860 #[cfg(feature = "pg_kvbackend")]
861 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
862 PostgresExecution {
863 #[snafu(source)]
864 error: tokio_postgres::Error,
865 sql: String,
866 #[snafu(implicit)]
867 location: Location,
868 },
869
870 #[cfg(feature = "pg_kvbackend")]
871 #[snafu(display("Failed to get Postgres client"))]
872 GetPostgresClient {
873 #[snafu(implicit)]
874 location: Location,
875 #[snafu(source)]
876 error: deadpool::managed::PoolError<tokio_postgres::Error>,
877 },
878
879 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
880 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
881 SqlExecutionTimeout {
882 #[snafu(implicit)]
883 location: Location,
884 sql: String,
885 duration: std::time::Duration,
886 },
887
888 #[cfg(feature = "pg_kvbackend")]
889 #[snafu(display("Failed to create connection pool for Postgres"))]
890 CreatePostgresPool {
891 #[snafu(source)]
892 error: deadpool_postgres::CreatePoolError,
893 #[snafu(implicit)]
894 location: Location,
895 },
896
897 #[cfg(feature = "pg_kvbackend")]
898 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
899 GetPostgresConnection {
900 reason: String,
901 #[snafu(implicit)]
902 location: Location,
903 },
904
905 #[cfg(feature = "mysql_kvbackend")]
906 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
907 MySqlExecution {
908 #[snafu(source)]
909 error: sqlx::Error,
910 #[snafu(implicit)]
911 location: Location,
912 sql: String,
913 },
914
915 #[cfg(feature = "mysql_kvbackend")]
916 #[snafu(display("Failed to create mysql pool"))]
917 CreateMySqlPool {
918 #[snafu(source)]
919 error: sqlx::Error,
920 #[snafu(implicit)]
921 location: Location,
922 },
923
924 #[cfg(feature = "mysql_kvbackend")]
925 #[snafu(display("Failed to acquire mysql client from pool"))]
926 AcquireMySqlClient {
927 #[snafu(source)]
928 error: sqlx::Error,
929 #[snafu(implicit)]
930 location: Location,
931 },
932
933 #[snafu(display("Handler not found: {}", name))]
934 HandlerNotFound {
935 name: String,
936 #[snafu(implicit)]
937 location: Location,
938 },
939
940 #[snafu(display("Flow state handler error"))]
941 FlowStateHandler {
942 #[snafu(implicit)]
943 location: Location,
944 source: common_meta::error::Error,
945 },
946
947 #[snafu(display("Failed to build wal provider"))]
948 BuildWalProvider {
949 #[snafu(implicit)]
950 location: Location,
951 source: common_meta::error::Error,
952 },
953
954 #[snafu(display("Failed to parse wal options"))]
955 ParseWalOptions {
956 #[snafu(implicit)]
957 location: Location,
958 source: common_meta::error::Error,
959 },
960
961 #[snafu(display("Failed to build kafka client."))]
962 BuildKafkaClient {
963 #[snafu(implicit)]
964 location: Location,
965 #[snafu(source)]
966 error: common_meta::error::Error,
967 },
968
969 #[snafu(display(
970 "Failed to build a Kafka partition client, topic: {}, partition: {}",
971 topic,
972 partition
973 ))]
974 BuildPartitionClient {
975 topic: String,
976 partition: i32,
977 #[snafu(implicit)]
978 location: Location,
979 #[snafu(source)]
980 error: rskafka::client::error::Error,
981 },
982
983 #[snafu(display(
984 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
985 topic,
986 partition,
987 offset
988 ))]
989 DeleteRecords {
990 #[snafu(implicit)]
991 location: Location,
992 #[snafu(source)]
993 error: rskafka::client::error::Error,
994 topic: String,
995 partition: i32,
996 offset: u64,
997 },
998
999 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
1000 GetOffset {
1001 topic: String,
1002 #[snafu(implicit)]
1003 location: Location,
1004 #[snafu(source)]
1005 error: rskafka::client::error::Error,
1006 },
1007
1008 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1009 UpdateTopicNameValue {
1010 topic: String,
1011 #[snafu(implicit)]
1012 location: Location,
1013 #[snafu(source)]
1014 source: common_meta::error::Error,
1015 },
1016
1017 #[snafu(display("Failed to build tls options"))]
1018 BuildTlsOptions {
1019 #[snafu(implicit)]
1020 location: Location,
1021 #[snafu(source)]
1022 source: common_meta::error::Error,
1023 },
1024
1025 #[snafu(display(
1026 "Repartition group {} source region missing, region id: {}",
1027 group_id,
1028 region_id
1029 ))]
1030 RepartitionSourceRegionMissing {
1031 group_id: Uuid,
1032 region_id: RegionId,
1033 #[snafu(implicit)]
1034 location: Location,
1035 },
1036
1037 #[snafu(display(
1038 "Repartition group {} target region missing, region id: {}",
1039 group_id,
1040 region_id
1041 ))]
1042 RepartitionTargetRegionMissing {
1043 group_id: Uuid,
1044 region_id: RegionId,
1045 #[snafu(implicit)]
1046 location: Location,
1047 },
1048
1049 #[snafu(display("Failed to serialize partition expression"))]
1050 SerializePartitionExpr {
1051 #[snafu(source)]
1052 source: partition::error::Error,
1053 #[snafu(implicit)]
1054 location: Location,
1055 },
1056
1057 #[snafu(display("Failed to deserialize partition expression"))]
1058 DeserializePartitionExpr {
1059 #[snafu(source)]
1060 source: partition::error::Error,
1061 #[snafu(implicit)]
1062 location: Location,
1063 },
1064
1065 #[snafu(display("Empty partition expression"))]
1066 EmptyPartitionExpr {
1067 #[snafu(implicit)]
1068 location: Location,
1069 },
1070
1071 #[snafu(display(
1072 "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1073 region_id,
1074 expected,
1075 actual
1076 ))]
1077 PartitionExprMismatch {
1078 region_id: RegionId,
1079 expected: String,
1080 actual: String,
1081 #[snafu(implicit)]
1082 location: Location,
1083 },
1084
1085 #[snafu(display("Failed to allocate regions for table: {}", table_id))]
1086 AllocateRegions {
1087 #[snafu(implicit)]
1088 location: Location,
1089 table_id: TableId,
1090 #[snafu(source)]
1091 source: common_meta::error::Error,
1092 },
1093
1094 #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1095 DeallocateRegions {
1096 #[snafu(implicit)]
1097 location: Location,
1098 table_id: TableId,
1099 #[snafu(source)]
1100 source: common_meta::error::Error,
1101 },
1102
1103 #[snafu(display("Failed to build create request for table: {}", table_id))]
1104 BuildCreateRequest {
1105 #[snafu(implicit)]
1106 location: Location,
1107 table_id: TableId,
1108 #[snafu(source)]
1109 source: common_meta::error::Error,
1110 },
1111
1112 #[snafu(display("Failed to allocate region routes for table: {}", table_id))]
1113 AllocateRegionRoutes {
1114 #[snafu(implicit)]
1115 location: Location,
1116 table_id: TableId,
1117 #[snafu(source)]
1118 source: common_meta::error::Error,
1119 },
1120
1121 #[snafu(display("Failed to allocate wal options for table: {}", table_id))]
1122 AllocateWalOptions {
1123 #[snafu(implicit)]
1124 location: Location,
1125 table_id: TableId,
1126 #[snafu(source)]
1127 source: common_meta::error::Error,
1128 },
1129}
1130
1131impl Error {
1132 pub fn is_retryable(&self) -> bool {
1134 matches!(
1135 self,
1136 Error::RetryLater { .. }
1137 | Error::RetryLaterWithSource { .. }
1138 | Error::MailboxTimeout { .. }
1139 )
1140 }
1141}
1142
1143pub type Result<T> = std::result::Result<T, Error>;
1144
1145define_into_tonic_status!(Error);
1146
1147impl ErrorExt for Error {
1148 fn status_code(&self) -> StatusCode {
1149 match self {
1150 Error::EtcdFailed { .. }
1151 | Error::ConnectEtcd { .. }
1152 | Error::FileIo { .. }
1153 | Error::TcpBind { .. }
1154 | Error::SerializeConfig { .. }
1155 | Error::SerializeToJson { .. }
1156 | Error::DeserializeFromJson { .. }
1157 | Error::NoLeader { .. }
1158 | Error::LeaderLeaseExpired { .. }
1159 | Error::LeaderLeaseChanged { .. }
1160 | Error::CreateChannel { .. }
1161 | Error::BatchGet { .. }
1162 | Error::Range { .. }
1163 | Error::ResponseHeaderNotFound { .. }
1164 | Error::InvalidHttpBody { .. }
1165 | Error::ExceededRetryLimit { .. }
1166 | Error::SendShutdownSignal { .. }
1167 | Error::PushMessage { .. }
1168 | Error::MailboxClosed { .. }
1169 | Error::MailboxReceiver { .. }
1170 | Error::StartGrpc { .. }
1171 | Error::PublishMessage { .. }
1172 | Error::Join { .. }
1173 | Error::ChooseItems { .. }
1174 | Error::FlowStateHandler { .. }
1175 | Error::BuildWalProvider { .. }
1176 | Error::BuildPartitionClient { .. }
1177 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1178
1179 Error::DeleteRecords { .. }
1180 | Error::GetOffset { .. }
1181 | Error::PeerUnavailable { .. }
1182 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1183 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1184 Error::PruneTaskAlreadyRunning { .. }
1185 | Error::RetryLater { .. }
1186 | Error::MailboxChannelClosed { .. }
1187 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1188 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1189 Error::SerializePartitionExpr { source, .. }
1190 | Error::DeserializePartitionExpr { source, .. } => source.status_code(),
1191
1192 Error::Unsupported { .. } => StatusCode::Unsupported,
1193
1194 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1195
1196 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1197 Error::EmptyKey { .. }
1198 | Error::MissingRequiredParameter { .. }
1199 | Error::MissingRequestHeader { .. }
1200 | Error::InvalidLeaseKey { .. }
1201 | Error::InvalidStatKey { .. }
1202 | Error::InvalidInactiveRegionKey { .. }
1203 | Error::ParseNum { .. }
1204 | Error::ParseBool { .. }
1205 | Error::ParseAddr { .. }
1206 | Error::UnsupportedSelectorType { .. }
1207 | Error::InvalidArguments { .. }
1208 | Error::ProcedureNotFound { .. }
1209 | Error::TooManyPartitions { .. }
1210 | Error::TomlFormat { .. }
1211 | Error::HandlerNotFound { .. }
1212 | Error::LeaderPeerChanged { .. }
1213 | Error::RepartitionSourceRegionMissing { .. }
1214 | Error::RepartitionTargetRegionMissing { .. }
1215 | Error::PartitionExprMismatch { .. }
1216 | Error::RepartitionSourceExprMismatch { .. }
1217 | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
1218 Error::LeaseKeyFromUtf8 { .. }
1219 | Error::LeaseValueFromUtf8 { .. }
1220 | Error::InvalidRegionKeyFromUtf8 { .. }
1221 | Error::TableRouteNotFound { .. }
1222 | Error::TableInfoNotFound { .. }
1223 | Error::DatanodeTableNotFound { .. }
1224 | Error::InvalidUtf8Value { .. }
1225 | Error::UnexpectedInstructionReply { .. }
1226 | Error::Unexpected { .. }
1227 | Error::RegionOperatingRace { .. }
1228 | Error::RegionRouteNotFound { .. }
1229 | Error::MigrationAbort { .. }
1230 | Error::MigrationRunning { .. }
1231 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1232 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1233 Error::SaveClusterInfo { source, .. }
1234 | Error::InvalidClusterInfoFormat { source, .. }
1235 | Error::InvalidDatanodeStatFormat { source, .. }
1236 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1237 Error::InvalidateTableCache { source, .. } => source.status_code(),
1238 Error::SubmitProcedure { source, .. }
1239 | Error::WaitProcedure { source, .. }
1240 | Error::QueryProcedure { source, .. } => source.status_code(),
1241 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1242 source.status_code()
1243 }
1244 Error::StartProcedureManager { source, .. }
1245 | Error::StopProcedureManager { source, .. } => source.status_code(),
1246
1247 Error::ListCatalogs { source, .. }
1248 | Error::ListSchemas { source, .. }
1249 | Error::ListTables { source, .. } => source.status_code(),
1250 Error::StartTelemetryTask { source, .. } => source.status_code(),
1251
1252 Error::NextSequence { source, .. }
1253 | Error::SetNextSequence { source, .. }
1254 | Error::PeekSequence { source, .. } => source.status_code(),
1255 Error::DowngradeLeader { source, .. } => source.status_code(),
1256 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1257 Error::SubmitDdlTask { source, .. }
1258 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1259 Error::ConvertProtoData { source, .. }
1260 | Error::TableMetadataManager { source, .. }
1261 | Error::RuntimeSwitchManager { source, .. }
1262 | Error::KvBackend { source, .. }
1263 | Error::UnexpectedLogicalRouteTable { source, .. }
1264 | Error::UpdateTopicNameValue { source, .. }
1265 | Error::ParseWalOptions { source, .. } => source.status_code(),
1266 Error::ListActiveFrontends { source, .. }
1267 | Error::ListActiveDatanodes { source, .. }
1268 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1269 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1270
1271 Error::InitMetadata { source, .. }
1272 | Error::InitDdlManager { source, .. }
1273 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1274
1275 Error::BuildTlsOptions { source, .. } => source.status_code(),
1276 Error::Other { source, .. } => source.status_code(),
1277 Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1278 Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1279 Error::AllocateRegions { source, .. } => source.status_code(),
1280 Error::DeallocateRegions { source, .. } => source.status_code(),
1281 Error::AllocateRegionRoutes { source, .. } => source.status_code(),
1282 Error::AllocateWalOptions { source, .. } => source.status_code(),
1283 Error::BuildCreateRequest { source, .. } => source.status_code(),
1284 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1285
1286 #[cfg(feature = "pg_kvbackend")]
1287 Error::CreatePostgresPool { .. }
1288 | Error::GetPostgresClient { .. }
1289 | Error::GetPostgresConnection { .. }
1290 | Error::PostgresExecution { .. } => StatusCode::Internal,
1291 #[cfg(feature = "mysql_kvbackend")]
1292 Error::MySqlExecution { .. }
1293 | Error::CreateMySqlPool { .. }
1294 | Error::ParseMySqlUrl { .. }
1295 | Error::DecodeSqlValue { .. }
1296 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1297 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1298 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1299 }
1300 }
1301
1302 fn as_any(&self) -> &dyn std::any::Any {
1303 self
1304 }
1305}
1306
1307pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1309 let mut err: &(dyn std::error::Error + 'static) = err_status;
1310
1311 loop {
1312 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1313 return Some(io_err);
1314 }
1315
1316 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1319 && let Some(io_err) = h2_err.get_io()
1320 {
1321 return Some(io_err);
1322 }
1323
1324 err = err.source()?;
1325 }
1326}