1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Failed to choose items"))]
36 ChooseItems {
37 #[snafu(implicit)]
38 location: Location,
39 #[snafu(source)]
40 error: rand::distr::weighted::Error,
41 },
42
43 #[snafu(display("Exceeded deadline, operation: {}", operation))]
44 ExceededDeadline {
45 #[snafu(implicit)]
46 location: Location,
47 operation: String,
48 },
49
50 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51 PeerUnavailable {
52 #[snafu(implicit)]
53 location: Location,
54 peer_id: u64,
55 },
56
57 #[snafu(display("Failed to list active frontends"))]
58 ListActiveFrontends {
59 #[snafu(implicit)]
60 location: Location,
61 source: common_meta::error::Error,
62 },
63
64 #[snafu(display("Failed to list active datanodes"))]
65 ListActiveDatanodes {
66 #[snafu(implicit)]
67 location: Location,
68 source: common_meta::error::Error,
69 },
70
71 #[snafu(display("Failed to list active flownodes"))]
72 ListActiveFlownodes {
73 #[snafu(implicit)]
74 location: Location,
75 source: common_meta::error::Error,
76 },
77
78 #[snafu(display("No available frontend"))]
79 NoAvailableFrontend {
80 #[snafu(implicit)]
81 location: Location,
82 },
83
84 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
85 MigrationRunning {
86 #[snafu(implicit)]
87 location: Location,
88 region_id: RegionId,
89 },
90
91 #[snafu(display(
92 "The region migration procedure is completed for region: {}, target_peer: {}",
93 region_id,
94 target_peer_id
95 ))]
96 RegionMigrated {
97 #[snafu(implicit)]
98 location: Location,
99 region_id: RegionId,
100 target_peer_id: u64,
101 },
102
103 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
104 MigrationAbort {
105 #[snafu(implicit)]
106 location: Location,
107 reason: String,
108 },
109
110 #[snafu(display(
111 "Another procedure is opening the region: {} on peer: {}",
112 region_id,
113 peer_id
114 ))]
115 RegionOpeningRace {
116 #[snafu(implicit)]
117 location: Location,
118 peer_id: DatanodeId,
119 region_id: RegionId,
120 },
121
122 #[snafu(display("Failed to init ddl manager"))]
123 InitDdlManager {
124 #[snafu(implicit)]
125 location: Location,
126 source: common_meta::error::Error,
127 },
128
129 #[snafu(display("Failed to init reconciliation manager"))]
130 InitReconciliationManager {
131 #[snafu(implicit)]
132 location: Location,
133 source: common_meta::error::Error,
134 },
135
136 #[snafu(display("Failed to create default catalog and schema"))]
137 InitMetadata {
138 #[snafu(implicit)]
139 location: Location,
140 source: common_meta::error::Error,
141 },
142
143 #[snafu(display("Failed to allocate next sequence number"))]
144 NextSequence {
145 #[snafu(implicit)]
146 location: Location,
147 source: common_meta::error::Error,
148 },
149
150 #[snafu(display("Failed to set next sequence number"))]
151 SetNextSequence {
152 #[snafu(implicit)]
153 location: Location,
154 source: common_meta::error::Error,
155 },
156
157 #[snafu(display("Failed to peek sequence number"))]
158 PeekSequence {
159 #[snafu(implicit)]
160 location: Location,
161 source: common_meta::error::Error,
162 },
163
164 #[snafu(display("Failed to start telemetry task"))]
165 StartTelemetryTask {
166 #[snafu(implicit)]
167 location: Location,
168 source: common_runtime::error::Error,
169 },
170
171 #[snafu(display("Failed to submit ddl task"))]
172 SubmitDdlTask {
173 #[snafu(implicit)]
174 location: Location,
175 source: common_meta::error::Error,
176 },
177
178 #[snafu(display("Failed to submit reconcile procedure"))]
179 SubmitReconcileProcedure {
180 #[snafu(implicit)]
181 location: Location,
182 source: common_meta::error::Error,
183 },
184
185 #[snafu(display("Failed to invalidate table cache"))]
186 InvalidateTableCache {
187 #[snafu(implicit)]
188 location: Location,
189 source: common_meta::error::Error,
190 },
191
192 #[snafu(display("Failed to list catalogs"))]
193 ListCatalogs {
194 #[snafu(implicit)]
195 location: Location,
196 source: BoxedError,
197 },
198
199 #[snafu(display("Failed to list {}'s schemas", catalog))]
200 ListSchemas {
201 #[snafu(implicit)]
202 location: Location,
203 catalog: String,
204 source: BoxedError,
205 },
206
207 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
208 ListTables {
209 #[snafu(implicit)]
210 location: Location,
211 catalog: String,
212 schema: String,
213 source: BoxedError,
214 },
215
216 #[snafu(display("Failed to join a future"))]
217 Join {
218 #[snafu(implicit)]
219 location: Location,
220 #[snafu(source)]
221 error: JoinError,
222 },
223
224 #[snafu(display(
225 "Failed to request {}, required: {}, but only {} available",
226 select_target,
227 required,
228 available
229 ))]
230 NoEnoughAvailableNode {
231 #[snafu(implicit)]
232 location: Location,
233 required: usize,
234 available: usize,
235 select_target: SelectTarget,
236 },
237
238 #[snafu(display("Failed to send shutdown signal"))]
239 SendShutdownSignal {
240 #[snafu(source)]
241 error: SendError<()>,
242 },
243
244 #[snafu(display("Failed to shutdown {} server", server))]
245 ShutdownServer {
246 #[snafu(implicit)]
247 location: Location,
248 source: servers::error::Error,
249 server: String,
250 },
251
252 #[snafu(display("Empty key is not allowed"))]
253 EmptyKey {
254 #[snafu(implicit)]
255 location: Location,
256 },
257
258 #[snafu(display("Failed to execute via Etcd"))]
259 EtcdFailed {
260 #[snafu(source)]
261 error: etcd_client::Error,
262 #[snafu(implicit)]
263 location: Location,
264 },
265
266 #[snafu(display("Failed to connect to Etcd"))]
267 ConnectEtcd {
268 #[snafu(source)]
269 error: etcd_client::Error,
270 #[snafu(implicit)]
271 location: Location,
272 },
273
274 #[snafu(display("Failed to read file: {}", path))]
275 FileIo {
276 #[snafu(source)]
277 error: std::io::Error,
278 #[snafu(implicit)]
279 location: Location,
280 path: String,
281 },
282
283 #[snafu(display("Failed to bind address {}", addr))]
284 TcpBind {
285 addr: String,
286 #[snafu(source)]
287 error: std::io::Error,
288 #[snafu(implicit)]
289 location: Location,
290 },
291
292 #[snafu(display("Failed to start gRPC server"))]
293 StartGrpc {
294 #[snafu(source)]
295 error: tonic::transport::Error,
296 #[snafu(implicit)]
297 location: Location,
298 },
299
300 #[snafu(display("Failed to start http server"))]
301 StartHttp {
302 #[snafu(implicit)]
303 location: Location,
304 source: servers::error::Error,
305 },
306
307 #[snafu(display("Failed to parse address {}", addr))]
308 ParseAddr {
309 addr: String,
310 #[snafu(source)]
311 error: std::net::AddrParseError,
312 },
313
314 #[snafu(display("Invalid lease key: {}", key))]
315 InvalidLeaseKey {
316 key: String,
317 #[snafu(implicit)]
318 location: Location,
319 },
320
321 #[snafu(display("Invalid datanode stat key: {}", key))]
322 InvalidStatKey {
323 key: String,
324 #[snafu(implicit)]
325 location: Location,
326 },
327
328 #[snafu(display("Invalid inactive region key: {}", key))]
329 InvalidInactiveRegionKey {
330 key: String,
331 #[snafu(implicit)]
332 location: Location,
333 },
334
335 #[snafu(display("Failed to parse lease key from utf8"))]
336 LeaseKeyFromUtf8 {
337 #[snafu(source)]
338 error: std::string::FromUtf8Error,
339 #[snafu(implicit)]
340 location: Location,
341 },
342
343 #[snafu(display("Failed to parse lease value from utf8"))]
344 LeaseValueFromUtf8 {
345 #[snafu(source)]
346 error: std::string::FromUtf8Error,
347 #[snafu(implicit)]
348 location: Location,
349 },
350
351 #[snafu(display("Failed to parse invalid region key from utf8"))]
352 InvalidRegionKeyFromUtf8 {
353 #[snafu(source)]
354 error: std::string::FromUtf8Error,
355 #[snafu(implicit)]
356 location: Location,
357 },
358
359 #[snafu(display("Failed to serialize to json: {}", input))]
360 SerializeToJson {
361 input: String,
362 #[snafu(source)]
363 error: serde_json::error::Error,
364 #[snafu(implicit)]
365 location: Location,
366 },
367
368 #[snafu(display("Failed to deserialize from json: {}", input))]
369 DeserializeFromJson {
370 input: String,
371 #[snafu(source)]
372 error: serde_json::error::Error,
373 #[snafu(implicit)]
374 location: Location,
375 },
376
377 #[snafu(display("Failed to parse number: {}", err_msg))]
378 ParseNum {
379 err_msg: String,
380 #[snafu(source)]
381 error: std::num::ParseIntError,
382 #[snafu(implicit)]
383 location: Location,
384 },
385
386 #[snafu(display("Failed to parse bool: {}", err_msg))]
387 ParseBool {
388 err_msg: String,
389 #[snafu(source)]
390 error: std::str::ParseBoolError,
391 #[snafu(implicit)]
392 location: Location,
393 },
394
395 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
396 DowngradeLeader {
397 region_id: RegionId,
398 #[snafu(implicit)]
399 location: Location,
400 #[snafu(source)]
401 source: BoxedError,
402 },
403
404 #[snafu(display("Region's leader peer changed: {}", msg))]
405 LeaderPeerChanged {
406 msg: String,
407 #[snafu(implicit)]
408 location: Location,
409 },
410
411 #[snafu(display("Invalid arguments: {}", err_msg))]
412 InvalidArguments {
413 err_msg: String,
414 #[snafu(implicit)]
415 location: Location,
416 },
417
418 #[cfg(feature = "mysql_kvbackend")]
419 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
420 ParseMySqlUrl {
421 #[snafu(source)]
422 error: sqlx::error::Error,
423 mysql_url: String,
424 #[snafu(implicit)]
425 location: Location,
426 },
427
428 #[cfg(feature = "mysql_kvbackend")]
429 #[snafu(display("Failed to decode sql value"))]
430 DecodeSqlValue {
431 #[snafu(source)]
432 error: sqlx::error::Error,
433 #[snafu(implicit)]
434 location: Location,
435 },
436
437 #[snafu(display("Failed to find table route for {table_id}"))]
438 TableRouteNotFound {
439 table_id: TableId,
440 #[snafu(implicit)]
441 location: Location,
442 },
443
444 #[snafu(display("Failed to find table route for {region_id}"))]
445 RegionRouteNotFound {
446 region_id: RegionId,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("Table info not found: {}", table_id))]
452 TableInfoNotFound {
453 table_id: TableId,
454 #[snafu(implicit)]
455 location: Location,
456 },
457
458 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
459 DatanodeTableNotFound {
460 table_id: TableId,
461 datanode_id: DatanodeId,
462 #[snafu(implicit)]
463 location: Location,
464 },
465
466 #[snafu(display("Metasrv has no leader at this moment"))]
467 NoLeader {
468 #[snafu(implicit)]
469 location: Location,
470 },
471
472 #[snafu(display("Leader lease expired"))]
473 LeaderLeaseExpired {
474 #[snafu(implicit)]
475 location: Location,
476 },
477
478 #[snafu(display("Leader lease changed during election"))]
479 LeaderLeaseChanged {
480 #[snafu(implicit)]
481 location: Location,
482 },
483
484 #[snafu(display("Table {} not found", name))]
485 TableNotFound {
486 name: String,
487 #[snafu(implicit)]
488 location: Location,
489 },
490
491 #[snafu(display("Unsupported selector type, {}", selector_type))]
492 UnsupportedSelectorType {
493 selector_type: String,
494 #[snafu(implicit)]
495 location: Location,
496 },
497
498 #[snafu(display("Unexpected, violated: {violated}"))]
499 Unexpected {
500 violated: String,
501 #[snafu(implicit)]
502 location: Location,
503 },
504
505 #[snafu(display("Failed to create gRPC channel"))]
506 CreateChannel {
507 #[snafu(implicit)]
508 location: Location,
509 source: common_grpc::error::Error,
510 },
511
512 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
513 BatchGet {
514 #[snafu(source)]
515 error: tonic::Status,
516 #[snafu(implicit)]
517 location: Location,
518 },
519
520 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
521 Range {
522 #[snafu(source)]
523 error: tonic::Status,
524 #[snafu(implicit)]
525 location: Location,
526 },
527
528 #[snafu(display("Response header not found"))]
529 ResponseHeaderNotFound {
530 #[snafu(implicit)]
531 location: Location,
532 },
533
534 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
535 IsNotLeader {
536 node_addr: String,
537 #[snafu(implicit)]
538 location: Location,
539 },
540
541 #[snafu(display("Invalid http body"))]
542 InvalidHttpBody {
543 #[snafu(source)]
544 error: http::Error,
545 #[snafu(implicit)]
546 location: Location,
547 },
548
549 #[snafu(display(
550 "The number of retries for the grpc call {} exceeded the limit, {}",
551 func_name,
552 retry_num
553 ))]
554 ExceededRetryLimit {
555 func_name: String,
556 retry_num: usize,
557 #[snafu(implicit)]
558 location: Location,
559 },
560
561 #[snafu(display("Invalid utf-8 value"))]
562 InvalidUtf8Value {
563 #[snafu(source)]
564 error: std::string::FromUtf8Error,
565 #[snafu(implicit)]
566 location: Location,
567 },
568
569 #[snafu(display("Missing required parameter, param: {:?}", param))]
570 MissingRequiredParameter { param: String },
571
572 #[snafu(display("Failed to start procedure manager"))]
573 StartProcedureManager {
574 #[snafu(implicit)]
575 location: Location,
576 source: common_procedure::Error,
577 },
578
579 #[snafu(display("Failed to stop procedure manager"))]
580 StopProcedureManager {
581 #[snafu(implicit)]
582 location: Location,
583 source: common_procedure::Error,
584 },
585
586 #[snafu(display("Failed to wait procedure done"))]
587 WaitProcedure {
588 #[snafu(implicit)]
589 location: Location,
590 source: common_procedure::Error,
591 },
592
593 #[snafu(display("Failed to query procedure state"))]
594 QueryProcedure {
595 #[snafu(implicit)]
596 location: Location,
597 source: common_procedure::Error,
598 },
599
600 #[snafu(display("Procedure not found: {pid}"))]
601 ProcedureNotFound {
602 #[snafu(implicit)]
603 location: Location,
604 pid: String,
605 },
606
607 #[snafu(display("Failed to submit procedure"))]
608 SubmitProcedure {
609 #[snafu(implicit)]
610 location: Location,
611 source: common_procedure::Error,
612 },
613
614 #[snafu(display("A prune task for topic {} is already running", topic))]
615 PruneTaskAlreadyRunning {
616 topic: String,
617 #[snafu(implicit)]
618 location: Location,
619 },
620
621 #[snafu(display("Schema already exists, name: {schema_name}"))]
622 SchemaAlreadyExists {
623 schema_name: String,
624 #[snafu(implicit)]
625 location: Location,
626 },
627
628 #[snafu(display("Table already exists: {table_name}"))]
629 TableAlreadyExists {
630 table_name: String,
631 #[snafu(implicit)]
632 location: Location,
633 },
634
635 #[snafu(display("Pusher not found: {pusher_id}"))]
636 PusherNotFound {
637 pusher_id: String,
638 #[snafu(implicit)]
639 location: Location,
640 },
641
642 #[snafu(display("Failed to push message: {err_msg}"))]
643 PushMessage {
644 err_msg: String,
645 #[snafu(implicit)]
646 location: Location,
647 },
648
649 #[snafu(display("Mailbox already closed: {id}"))]
650 MailboxClosed {
651 id: u64,
652 #[snafu(implicit)]
653 location: Location,
654 },
655
656 #[snafu(display("Mailbox timeout: {id}"))]
657 MailboxTimeout {
658 id: u64,
659 #[snafu(implicit)]
660 location: Location,
661 },
662
663 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
664 MailboxReceiver {
665 id: u64,
666 err_msg: String,
667 #[snafu(implicit)]
668 location: Location,
669 },
670
671 #[snafu(display("Mailbox channel closed: {channel}"))]
672 MailboxChannelClosed {
673 channel: Channel,
674 #[snafu(implicit)]
675 location: Location,
676 },
677
678 #[snafu(display("Missing request header"))]
679 MissingRequestHeader {
680 #[snafu(implicit)]
681 location: Location,
682 },
683
684 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
685 RegisterProcedureLoader {
686 type_name: String,
687 #[snafu(implicit)]
688 location: Location,
689 source: common_procedure::error::Error,
690 },
691
692 #[snafu(display(
693 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
694 mailbox_message,
695 reason
696 ))]
697 UnexpectedInstructionReply {
698 mailbox_message: String,
699 reason: String,
700 #[snafu(implicit)]
701 location: Location,
702 },
703
704 #[snafu(display("Expected to retry later, reason: {}", reason))]
705 RetryLater {
706 reason: String,
707 #[snafu(implicit)]
708 location: Location,
709 },
710
711 #[snafu(display("Expected to retry later, reason: {}", reason))]
712 RetryLaterWithSource {
713 reason: String,
714 #[snafu(implicit)]
715 location: Location,
716 source: BoxedError,
717 },
718
719 #[snafu(display("Failed to convert proto data"))]
720 ConvertProtoData {
721 #[snafu(implicit)]
722 location: Location,
723 source: common_meta::error::Error,
724 },
725
726 #[snafu(display("Other error"))]
729 Other {
730 source: BoxedError,
731 #[snafu(implicit)]
732 location: Location,
733 },
734
735 #[snafu(display("Table metadata manager error"))]
736 TableMetadataManager {
737 source: common_meta::error::Error,
738 #[snafu(implicit)]
739 location: Location,
740 },
741
742 #[snafu(display("Runtime switch manager error"))]
743 RuntimeSwitchManager {
744 source: common_meta::error::Error,
745 #[snafu(implicit)]
746 location: Location,
747 },
748
749 #[snafu(display("Keyvalue backend error"))]
750 KvBackend {
751 source: common_meta::error::Error,
752 #[snafu(implicit)]
753 location: Location,
754 },
755
756 #[snafu(display("Failed to publish message"))]
757 PublishMessage {
758 #[snafu(source)]
759 error: SendError<Message>,
760 #[snafu(implicit)]
761 location: Location,
762 },
763
764 #[snafu(display("Too many partitions"))]
765 TooManyPartitions {
766 #[snafu(implicit)]
767 location: Location,
768 },
769
770 #[snafu(display("Unsupported operation {}", operation))]
771 Unsupported {
772 operation: String,
773 #[snafu(implicit)]
774 location: Location,
775 },
776
777 #[snafu(display("Unexpected table route type: {}", err_msg))]
778 UnexpectedLogicalRouteTable {
779 #[snafu(implicit)]
780 location: Location,
781 err_msg: String,
782 source: common_meta::error::Error,
783 },
784
785 #[snafu(display("Failed to save cluster info"))]
786 SaveClusterInfo {
787 #[snafu(implicit)]
788 location: Location,
789 source: common_meta::error::Error,
790 },
791
792 #[snafu(display("Invalid cluster info format"))]
793 InvalidClusterInfoFormat {
794 #[snafu(implicit)]
795 location: Location,
796 source: common_meta::error::Error,
797 },
798
799 #[snafu(display("Invalid datanode stat format"))]
800 InvalidDatanodeStatFormat {
801 #[snafu(implicit)]
802 location: Location,
803 source: common_meta::error::Error,
804 },
805
806 #[snafu(display("Invalid node info format"))]
807 InvalidNodeInfoFormat {
808 #[snafu(implicit)]
809 location: Location,
810 source: common_meta::error::Error,
811 },
812
813 #[snafu(display("Failed to serialize options to TOML"))]
814 TomlFormat {
815 #[snafu(implicit)]
816 location: Location,
817 #[snafu(source(from(common_config::error::Error, Box::new)))]
818 source: Box<common_config::error::Error>,
819 },
820
821 #[cfg(feature = "pg_kvbackend")]
822 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
823 PostgresExecution {
824 #[snafu(source)]
825 error: tokio_postgres::Error,
826 sql: String,
827 #[snafu(implicit)]
828 location: Location,
829 },
830
831 #[cfg(feature = "pg_kvbackend")]
832 #[snafu(display("Failed to get Postgres client"))]
833 GetPostgresClient {
834 #[snafu(implicit)]
835 location: Location,
836 #[snafu(source)]
837 error: deadpool::managed::PoolError<tokio_postgres::Error>,
838 },
839
840 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
841 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
842 SqlExecutionTimeout {
843 #[snafu(implicit)]
844 location: Location,
845 sql: String,
846 duration: std::time::Duration,
847 },
848
849 #[cfg(feature = "pg_kvbackend")]
850 #[snafu(display("Failed to create connection pool for Postgres"))]
851 CreatePostgresPool {
852 #[snafu(source)]
853 error: deadpool_postgres::CreatePoolError,
854 #[snafu(implicit)]
855 location: Location,
856 },
857
858 #[cfg(feature = "pg_kvbackend")]
859 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
860 GetPostgresConnection {
861 reason: String,
862 #[snafu(implicit)]
863 location: Location,
864 },
865
866 #[cfg(feature = "mysql_kvbackend")]
867 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
868 MySqlExecution {
869 #[snafu(source)]
870 error: sqlx::Error,
871 #[snafu(implicit)]
872 location: Location,
873 sql: String,
874 },
875
876 #[cfg(feature = "mysql_kvbackend")]
877 #[snafu(display("Failed to create mysql pool"))]
878 CreateMySqlPool {
879 #[snafu(source)]
880 error: sqlx::Error,
881 #[snafu(implicit)]
882 location: Location,
883 },
884
885 #[cfg(feature = "mysql_kvbackend")]
886 #[snafu(display("Failed to acquire mysql client from pool"))]
887 AcquireMySqlClient {
888 #[snafu(source)]
889 error: sqlx::Error,
890 #[snafu(implicit)]
891 location: Location,
892 },
893
894 #[snafu(display("Handler not found: {}", name))]
895 HandlerNotFound {
896 name: String,
897 #[snafu(implicit)]
898 location: Location,
899 },
900
901 #[snafu(display("Flow state handler error"))]
902 FlowStateHandler {
903 #[snafu(implicit)]
904 location: Location,
905 source: common_meta::error::Error,
906 },
907
908 #[snafu(display("Failed to build wal options allocator"))]
909 BuildWalOptionsAllocator {
910 #[snafu(implicit)]
911 location: Location,
912 source: common_meta::error::Error,
913 },
914
915 #[snafu(display("Failed to parse wal options"))]
916 ParseWalOptions {
917 #[snafu(implicit)]
918 location: Location,
919 source: common_meta::error::Error,
920 },
921
922 #[snafu(display("Failed to build kafka client."))]
923 BuildKafkaClient {
924 #[snafu(implicit)]
925 location: Location,
926 #[snafu(source)]
927 error: common_meta::error::Error,
928 },
929
930 #[snafu(display(
931 "Failed to build a Kafka partition client, topic: {}, partition: {}",
932 topic,
933 partition
934 ))]
935 BuildPartitionClient {
936 topic: String,
937 partition: i32,
938 #[snafu(implicit)]
939 location: Location,
940 #[snafu(source)]
941 error: rskafka::client::error::Error,
942 },
943
944 #[snafu(display(
945 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
946 topic,
947 partition,
948 offset
949 ))]
950 DeleteRecords {
951 #[snafu(implicit)]
952 location: Location,
953 #[snafu(source)]
954 error: rskafka::client::error::Error,
955 topic: String,
956 partition: i32,
957 offset: u64,
958 },
959
960 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
961 GetOffset {
962 topic: String,
963 #[snafu(implicit)]
964 location: Location,
965 #[snafu(source)]
966 error: rskafka::client::error::Error,
967 },
968
969 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
970 UpdateTopicNameValue {
971 topic: String,
972 #[snafu(implicit)]
973 location: Location,
974 #[snafu(source)]
975 source: common_meta::error::Error,
976 },
977
978 #[snafu(display("Failed to build tls options"))]
979 BuildTlsOptions {
980 #[snafu(implicit)]
981 location: Location,
982 #[snafu(source)]
983 source: common_meta::error::Error,
984 },
985}
986
987impl Error {
988 pub fn is_retryable(&self) -> bool {
990 matches!(
991 self,
992 Error::RetryLater { .. }
993 | Error::RetryLaterWithSource { .. }
994 | Error::MailboxTimeout { .. }
995 )
996 }
997}
998
999pub type Result<T> = std::result::Result<T, Error>;
1000
1001define_into_tonic_status!(Error);
1002
1003impl ErrorExt for Error {
1004 fn status_code(&self) -> StatusCode {
1005 match self {
1006 Error::EtcdFailed { .. }
1007 | Error::ConnectEtcd { .. }
1008 | Error::FileIo { .. }
1009 | Error::TcpBind { .. }
1010 | Error::SerializeToJson { .. }
1011 | Error::DeserializeFromJson { .. }
1012 | Error::NoLeader { .. }
1013 | Error::LeaderLeaseExpired { .. }
1014 | Error::LeaderLeaseChanged { .. }
1015 | Error::CreateChannel { .. }
1016 | Error::BatchGet { .. }
1017 | Error::Range { .. }
1018 | Error::ResponseHeaderNotFound { .. }
1019 | Error::InvalidHttpBody { .. }
1020 | Error::ExceededRetryLimit { .. }
1021 | Error::SendShutdownSignal { .. }
1022 | Error::PushMessage { .. }
1023 | Error::MailboxClosed { .. }
1024 | Error::MailboxReceiver { .. }
1025 | Error::StartGrpc { .. }
1026 | Error::PublishMessage { .. }
1027 | Error::Join { .. }
1028 | Error::ChooseItems { .. }
1029 | Error::FlowStateHandler { .. }
1030 | Error::BuildWalOptionsAllocator { .. }
1031 | Error::BuildPartitionClient { .. }
1032 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1033
1034 Error::DeleteRecords { .. }
1035 | Error::GetOffset { .. }
1036 | Error::PeerUnavailable { .. }
1037 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1038 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1039 Error::PruneTaskAlreadyRunning { .. }
1040 | Error::RetryLater { .. }
1041 | Error::MailboxChannelClosed { .. }
1042 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1043 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1044
1045 Error::Unsupported { .. } => StatusCode::Unsupported,
1046
1047 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1048
1049 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1050 Error::EmptyKey { .. }
1051 | Error::MissingRequiredParameter { .. }
1052 | Error::MissingRequestHeader { .. }
1053 | Error::InvalidLeaseKey { .. }
1054 | Error::InvalidStatKey { .. }
1055 | Error::InvalidInactiveRegionKey { .. }
1056 | Error::ParseNum { .. }
1057 | Error::ParseBool { .. }
1058 | Error::ParseAddr { .. }
1059 | Error::UnsupportedSelectorType { .. }
1060 | Error::InvalidArguments { .. }
1061 | Error::ProcedureNotFound { .. }
1062 | Error::TooManyPartitions { .. }
1063 | Error::TomlFormat { .. }
1064 | Error::HandlerNotFound { .. }
1065 | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1066 Error::LeaseKeyFromUtf8 { .. }
1067 | Error::LeaseValueFromUtf8 { .. }
1068 | Error::InvalidRegionKeyFromUtf8 { .. }
1069 | Error::TableRouteNotFound { .. }
1070 | Error::TableInfoNotFound { .. }
1071 | Error::DatanodeTableNotFound { .. }
1072 | Error::InvalidUtf8Value { .. }
1073 | Error::UnexpectedInstructionReply { .. }
1074 | Error::Unexpected { .. }
1075 | Error::RegionOpeningRace { .. }
1076 | Error::RegionRouteNotFound { .. }
1077 | Error::MigrationAbort { .. }
1078 | Error::MigrationRunning { .. }
1079 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1080 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1081 Error::SaveClusterInfo { source, .. }
1082 | Error::InvalidClusterInfoFormat { source, .. }
1083 | Error::InvalidDatanodeStatFormat { source, .. }
1084 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1085 Error::InvalidateTableCache { source, .. } => source.status_code(),
1086 Error::SubmitProcedure { source, .. }
1087 | Error::WaitProcedure { source, .. }
1088 | Error::QueryProcedure { source, .. } => source.status_code(),
1089 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1090 source.status_code()
1091 }
1092 Error::StartProcedureManager { source, .. }
1093 | Error::StopProcedureManager { source, .. } => source.status_code(),
1094
1095 Error::ListCatalogs { source, .. }
1096 | Error::ListSchemas { source, .. }
1097 | Error::ListTables { source, .. } => source.status_code(),
1098 Error::StartTelemetryTask { source, .. } => source.status_code(),
1099
1100 Error::NextSequence { source, .. }
1101 | Error::SetNextSequence { source, .. }
1102 | Error::PeekSequence { source, .. } => source.status_code(),
1103 Error::DowngradeLeader { source, .. } => source.status_code(),
1104 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1105 Error::SubmitDdlTask { source, .. }
1106 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1107 Error::ConvertProtoData { source, .. }
1108 | Error::TableMetadataManager { source, .. }
1109 | Error::RuntimeSwitchManager { source, .. }
1110 | Error::KvBackend { source, .. }
1111 | Error::UnexpectedLogicalRouteTable { source, .. }
1112 | Error::UpdateTopicNameValue { source, .. }
1113 | Error::ParseWalOptions { source, .. } => source.status_code(),
1114 Error::ListActiveFrontends { source, .. }
1115 | Error::ListActiveDatanodes { source, .. }
1116 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1117 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1118
1119 Error::InitMetadata { source, .. }
1120 | Error::InitDdlManager { source, .. }
1121 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1122
1123 Error::BuildTlsOptions { source, .. } => source.status_code(),
1124 Error::Other { source, .. } => source.status_code(),
1125 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1126
1127 #[cfg(feature = "pg_kvbackend")]
1128 Error::CreatePostgresPool { .. }
1129 | Error::GetPostgresClient { .. }
1130 | Error::GetPostgresConnection { .. }
1131 | Error::PostgresExecution { .. } => StatusCode::Internal,
1132 #[cfg(feature = "mysql_kvbackend")]
1133 Error::MySqlExecution { .. }
1134 | Error::CreateMySqlPool { .. }
1135 | Error::ParseMySqlUrl { .. }
1136 | Error::DecodeSqlValue { .. }
1137 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1138 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1139 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1140 }
1141 }
1142
1143 fn as_any(&self) -> &dyn std::any::Any {
1144 self
1145 }
1146}
1147
1148pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1150 let mut err: &(dyn std::error::Error + 'static) = err_status;
1151
1152 loop {
1153 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1154 return Some(io_err);
1155 }
1156
1157 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1160 && let Some(io_err) = h2_err.get_io()
1161 {
1162 return Some(io_err);
1163 }
1164
1165 err = err.source()?;
1166 }
1167}