1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Failed to choose items"))]
36 ChooseItems {
37 #[snafu(implicit)]
38 location: Location,
39 #[snafu(source)]
40 error: rand::distr::weighted::Error,
41 },
42
43 #[snafu(display("Exceeded deadline, operation: {}", operation))]
44 ExceededDeadline {
45 #[snafu(implicit)]
46 location: Location,
47 operation: String,
48 },
49
50 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51 PeerUnavailable {
52 #[snafu(implicit)]
53 location: Location,
54 peer_id: u64,
55 },
56
57 #[snafu(display("Failed to lookup frontends"))]
58 LookupFrontends {
59 #[snafu(implicit)]
60 location: Location,
61 source: common_meta::error::Error,
62 },
63
64 #[snafu(display("No available frontend"))]
65 NoAvailableFrontend {
66 #[snafu(implicit)]
67 location: Location,
68 },
69
70 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
71 MigrationRunning {
72 #[snafu(implicit)]
73 location: Location,
74 region_id: RegionId,
75 },
76
77 #[snafu(display(
78 "The region migration procedure is completed for region: {}, target_peer: {}",
79 region_id,
80 target_peer_id
81 ))]
82 RegionMigrated {
83 #[snafu(implicit)]
84 location: Location,
85 region_id: RegionId,
86 target_peer_id: u64,
87 },
88
89 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
90 MigrationAbort {
91 #[snafu(implicit)]
92 location: Location,
93 reason: String,
94 },
95
96 #[snafu(display(
97 "Another procedure is opening the region: {} on peer: {}",
98 region_id,
99 peer_id
100 ))]
101 RegionOpeningRace {
102 #[snafu(implicit)]
103 location: Location,
104 peer_id: DatanodeId,
105 region_id: RegionId,
106 },
107
108 #[snafu(display("Failed to init ddl manager"))]
109 InitDdlManager {
110 #[snafu(implicit)]
111 location: Location,
112 source: common_meta::error::Error,
113 },
114
115 #[snafu(display("Failed to init reconciliation manager"))]
116 InitReconciliationManager {
117 #[snafu(implicit)]
118 location: Location,
119 source: common_meta::error::Error,
120 },
121
122 #[snafu(display("Failed to create default catalog and schema"))]
123 InitMetadata {
124 #[snafu(implicit)]
125 location: Location,
126 source: common_meta::error::Error,
127 },
128
129 #[snafu(display("Failed to allocate next sequence number"))]
130 NextSequence {
131 #[snafu(implicit)]
132 location: Location,
133 source: common_meta::error::Error,
134 },
135
136 #[snafu(display("Failed to set next sequence number"))]
137 SetNextSequence {
138 #[snafu(implicit)]
139 location: Location,
140 source: common_meta::error::Error,
141 },
142
143 #[snafu(display("Failed to peek sequence number"))]
144 PeekSequence {
145 #[snafu(implicit)]
146 location: Location,
147 source: common_meta::error::Error,
148 },
149
150 #[snafu(display("Failed to start telemetry task"))]
151 StartTelemetryTask {
152 #[snafu(implicit)]
153 location: Location,
154 source: common_runtime::error::Error,
155 },
156
157 #[snafu(display("Failed to submit ddl task"))]
158 SubmitDdlTask {
159 #[snafu(implicit)]
160 location: Location,
161 source: common_meta::error::Error,
162 },
163
164 #[snafu(display("Failed to submit reconcile procedure"))]
165 SubmitReconcileProcedure {
166 #[snafu(implicit)]
167 location: Location,
168 source: common_meta::error::Error,
169 },
170
171 #[snafu(display("Failed to invalidate table cache"))]
172 InvalidateTableCache {
173 #[snafu(implicit)]
174 location: Location,
175 source: common_meta::error::Error,
176 },
177
178 #[snafu(display("Failed to list catalogs"))]
179 ListCatalogs {
180 #[snafu(implicit)]
181 location: Location,
182 source: BoxedError,
183 },
184
185 #[snafu(display("Failed to list {}'s schemas", catalog))]
186 ListSchemas {
187 #[snafu(implicit)]
188 location: Location,
189 catalog: String,
190 source: BoxedError,
191 },
192
193 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
194 ListTables {
195 #[snafu(implicit)]
196 location: Location,
197 catalog: String,
198 schema: String,
199 source: BoxedError,
200 },
201
202 #[snafu(display("Failed to join a future"))]
203 Join {
204 #[snafu(implicit)]
205 location: Location,
206 #[snafu(source)]
207 error: JoinError,
208 },
209
210 #[snafu(display(
211 "Failed to request {}, required: {}, but only {} available",
212 select_target,
213 required,
214 available
215 ))]
216 NoEnoughAvailableNode {
217 #[snafu(implicit)]
218 location: Location,
219 required: usize,
220 available: usize,
221 select_target: SelectTarget,
222 },
223
224 #[snafu(display("Failed to send shutdown signal"))]
225 SendShutdownSignal {
226 #[snafu(source)]
227 error: SendError<()>,
228 },
229
230 #[snafu(display("Failed to shutdown {} server", server))]
231 ShutdownServer {
232 #[snafu(implicit)]
233 location: Location,
234 source: servers::error::Error,
235 server: String,
236 },
237
238 #[snafu(display("Empty key is not allowed"))]
239 EmptyKey {
240 #[snafu(implicit)]
241 location: Location,
242 },
243
244 #[snafu(display("Failed to execute via Etcd"))]
245 EtcdFailed {
246 #[snafu(source)]
247 error: etcd_client::Error,
248 #[snafu(implicit)]
249 location: Location,
250 },
251
252 #[snafu(display("Failed to connect to Etcd"))]
253 ConnectEtcd {
254 #[snafu(source)]
255 error: etcd_client::Error,
256 #[snafu(implicit)]
257 location: Location,
258 },
259
260 #[snafu(display("Failed to read file: {}", path))]
261 FileIo {
262 #[snafu(source)]
263 error: std::io::Error,
264 #[snafu(implicit)]
265 location: Location,
266 path: String,
267 },
268
269 #[snafu(display("Failed to bind address {}", addr))]
270 TcpBind {
271 addr: String,
272 #[snafu(source)]
273 error: std::io::Error,
274 #[snafu(implicit)]
275 location: Location,
276 },
277
278 #[snafu(display("Failed to start gRPC server"))]
279 StartGrpc {
280 #[snafu(source)]
281 error: tonic::transport::Error,
282 #[snafu(implicit)]
283 location: Location,
284 },
285
286 #[snafu(display("Failed to start http server"))]
287 StartHttp {
288 #[snafu(implicit)]
289 location: Location,
290 source: servers::error::Error,
291 },
292
293 #[snafu(display("Failed to init export metrics task"))]
294 InitExportMetricsTask {
295 #[snafu(implicit)]
296 location: Location,
297 source: servers::error::Error,
298 },
299
300 #[snafu(display("Failed to parse address {}", addr))]
301 ParseAddr {
302 addr: String,
303 #[snafu(source)]
304 error: std::net::AddrParseError,
305 },
306
307 #[snafu(display("Invalid lease key: {}", key))]
308 InvalidLeaseKey {
309 key: String,
310 #[snafu(implicit)]
311 location: Location,
312 },
313
314 #[snafu(display("Invalid datanode stat key: {}", key))]
315 InvalidStatKey {
316 key: String,
317 #[snafu(implicit)]
318 location: Location,
319 },
320
321 #[snafu(display("Invalid inactive region key: {}", key))]
322 InvalidInactiveRegionKey {
323 key: String,
324 #[snafu(implicit)]
325 location: Location,
326 },
327
328 #[snafu(display("Failed to parse lease key from utf8"))]
329 LeaseKeyFromUtf8 {
330 #[snafu(source)]
331 error: std::string::FromUtf8Error,
332 #[snafu(implicit)]
333 location: Location,
334 },
335
336 #[snafu(display("Failed to parse lease value from utf8"))]
337 LeaseValueFromUtf8 {
338 #[snafu(source)]
339 error: std::string::FromUtf8Error,
340 #[snafu(implicit)]
341 location: Location,
342 },
343
344 #[snafu(display("Failed to parse invalid region key from utf8"))]
345 InvalidRegionKeyFromUtf8 {
346 #[snafu(source)]
347 error: std::string::FromUtf8Error,
348 #[snafu(implicit)]
349 location: Location,
350 },
351
352 #[snafu(display("Failed to serialize to json: {}", input))]
353 SerializeToJson {
354 input: String,
355 #[snafu(source)]
356 error: serde_json::error::Error,
357 #[snafu(implicit)]
358 location: Location,
359 },
360
361 #[snafu(display("Failed to deserialize from json: {}", input))]
362 DeserializeFromJson {
363 input: String,
364 #[snafu(source)]
365 error: serde_json::error::Error,
366 #[snafu(implicit)]
367 location: Location,
368 },
369
370 #[snafu(display("Failed to parse number: {}", err_msg))]
371 ParseNum {
372 err_msg: String,
373 #[snafu(source)]
374 error: std::num::ParseIntError,
375 #[snafu(implicit)]
376 location: Location,
377 },
378
379 #[snafu(display("Failed to parse bool: {}", err_msg))]
380 ParseBool {
381 err_msg: String,
382 #[snafu(source)]
383 error: std::str::ParseBoolError,
384 #[snafu(implicit)]
385 location: Location,
386 },
387
388 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
389 DowngradeLeader {
390 region_id: RegionId,
391 #[snafu(implicit)]
392 location: Location,
393 #[snafu(source)]
394 source: BoxedError,
395 },
396
397 #[snafu(display("Region's leader peer changed: {}", msg))]
398 LeaderPeerChanged {
399 msg: String,
400 #[snafu(implicit)]
401 location: Location,
402 },
403
404 #[snafu(display("Invalid arguments: {}", err_msg))]
405 InvalidArguments {
406 err_msg: String,
407 #[snafu(implicit)]
408 location: Location,
409 },
410
411 #[cfg(feature = "mysql_kvbackend")]
412 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
413 ParseMySqlUrl {
414 #[snafu(source)]
415 error: sqlx::error::Error,
416 mysql_url: String,
417 #[snafu(implicit)]
418 location: Location,
419 },
420
421 #[cfg(feature = "mysql_kvbackend")]
422 #[snafu(display("Failed to decode sql value"))]
423 DecodeSqlValue {
424 #[snafu(source)]
425 error: sqlx::error::Error,
426 #[snafu(implicit)]
427 location: Location,
428 },
429
430 #[snafu(display("Failed to find table route for {table_id}"))]
431 TableRouteNotFound {
432 table_id: TableId,
433 #[snafu(implicit)]
434 location: Location,
435 },
436
437 #[snafu(display("Failed to find table route for {region_id}"))]
438 RegionRouteNotFound {
439 region_id: RegionId,
440 #[snafu(implicit)]
441 location: Location,
442 },
443
444 #[snafu(display("Table info not found: {}", table_id))]
445 TableInfoNotFound {
446 table_id: TableId,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
452 DatanodeTableNotFound {
453 table_id: TableId,
454 datanode_id: DatanodeId,
455 #[snafu(implicit)]
456 location: Location,
457 },
458
459 #[snafu(display("Metasrv has no leader at this moment"))]
460 NoLeader {
461 #[snafu(implicit)]
462 location: Location,
463 },
464
465 #[snafu(display("Leader lease expired"))]
466 LeaderLeaseExpired {
467 #[snafu(implicit)]
468 location: Location,
469 },
470
471 #[snafu(display("Leader lease changed during election"))]
472 LeaderLeaseChanged {
473 #[snafu(implicit)]
474 location: Location,
475 },
476
477 #[snafu(display("Table {} not found", name))]
478 TableNotFound {
479 name: String,
480 #[snafu(implicit)]
481 location: Location,
482 },
483
484 #[snafu(display("Unsupported selector type, {}", selector_type))]
485 UnsupportedSelectorType {
486 selector_type: String,
487 #[snafu(implicit)]
488 location: Location,
489 },
490
491 #[snafu(display("Unexpected, violated: {violated}"))]
492 Unexpected {
493 violated: String,
494 #[snafu(implicit)]
495 location: Location,
496 },
497
498 #[snafu(display("Failed to create gRPC channel"))]
499 CreateChannel {
500 #[snafu(implicit)]
501 location: Location,
502 source: common_grpc::error::Error,
503 },
504
505 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
506 BatchGet {
507 #[snafu(source)]
508 error: tonic::Status,
509 #[snafu(implicit)]
510 location: Location,
511 },
512
513 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
514 Range {
515 #[snafu(source)]
516 error: tonic::Status,
517 #[snafu(implicit)]
518 location: Location,
519 },
520
521 #[snafu(display("Response header not found"))]
522 ResponseHeaderNotFound {
523 #[snafu(implicit)]
524 location: Location,
525 },
526
527 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
528 IsNotLeader {
529 node_addr: String,
530 #[snafu(implicit)]
531 location: Location,
532 },
533
534 #[snafu(display("Invalid http body"))]
535 InvalidHttpBody {
536 #[snafu(source)]
537 error: http::Error,
538 #[snafu(implicit)]
539 location: Location,
540 },
541
542 #[snafu(display(
543 "The number of retries for the grpc call {} exceeded the limit, {}",
544 func_name,
545 retry_num
546 ))]
547 ExceededRetryLimit {
548 func_name: String,
549 retry_num: usize,
550 #[snafu(implicit)]
551 location: Location,
552 },
553
554 #[snafu(display("Invalid utf-8 value"))]
555 InvalidUtf8Value {
556 #[snafu(source)]
557 error: std::string::FromUtf8Error,
558 #[snafu(implicit)]
559 location: Location,
560 },
561
562 #[snafu(display("Missing required parameter, param: {:?}", param))]
563 MissingRequiredParameter { param: String },
564
565 #[snafu(display("Failed to start procedure manager"))]
566 StartProcedureManager {
567 #[snafu(implicit)]
568 location: Location,
569 source: common_procedure::Error,
570 },
571
572 #[snafu(display("Failed to stop procedure manager"))]
573 StopProcedureManager {
574 #[snafu(implicit)]
575 location: Location,
576 source: common_procedure::Error,
577 },
578
579 #[snafu(display("Failed to wait procedure done"))]
580 WaitProcedure {
581 #[snafu(implicit)]
582 location: Location,
583 source: common_procedure::Error,
584 },
585
586 #[snafu(display("Failed to query procedure state"))]
587 QueryProcedure {
588 #[snafu(implicit)]
589 location: Location,
590 source: common_procedure::Error,
591 },
592
593 #[snafu(display("Procedure not found: {pid}"))]
594 ProcedureNotFound {
595 #[snafu(implicit)]
596 location: Location,
597 pid: String,
598 },
599
600 #[snafu(display("Failed to submit procedure"))]
601 SubmitProcedure {
602 #[snafu(implicit)]
603 location: Location,
604 source: common_procedure::Error,
605 },
606
607 #[snafu(display("A prune task for topic {} is already running", topic))]
608 PruneTaskAlreadyRunning {
609 topic: String,
610 #[snafu(implicit)]
611 location: Location,
612 },
613
614 #[snafu(display("Schema already exists, name: {schema_name}"))]
615 SchemaAlreadyExists {
616 schema_name: String,
617 #[snafu(implicit)]
618 location: Location,
619 },
620
621 #[snafu(display("Table already exists: {table_name}"))]
622 TableAlreadyExists {
623 table_name: String,
624 #[snafu(implicit)]
625 location: Location,
626 },
627
628 #[snafu(display("Pusher not found: {pusher_id}"))]
629 PusherNotFound {
630 pusher_id: String,
631 #[snafu(implicit)]
632 location: Location,
633 },
634
635 #[snafu(display("Failed to push message: {err_msg}"))]
636 PushMessage {
637 err_msg: String,
638 #[snafu(implicit)]
639 location: Location,
640 },
641
642 #[snafu(display("Mailbox already closed: {id}"))]
643 MailboxClosed {
644 id: u64,
645 #[snafu(implicit)]
646 location: Location,
647 },
648
649 #[snafu(display("Mailbox timeout: {id}"))]
650 MailboxTimeout {
651 id: u64,
652 #[snafu(implicit)]
653 location: Location,
654 },
655
656 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
657 MailboxReceiver {
658 id: u64,
659 err_msg: String,
660 #[snafu(implicit)]
661 location: Location,
662 },
663
664 #[snafu(display("Mailbox channel closed: {channel}"))]
665 MailboxChannelClosed {
666 channel: Channel,
667 #[snafu(implicit)]
668 location: Location,
669 },
670
671 #[snafu(display("Missing request header"))]
672 MissingRequestHeader {
673 #[snafu(implicit)]
674 location: Location,
675 },
676
677 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
678 RegisterProcedureLoader {
679 type_name: String,
680 #[snafu(implicit)]
681 location: Location,
682 source: common_procedure::error::Error,
683 },
684
685 #[snafu(display(
686 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
687 mailbox_message,
688 reason
689 ))]
690 UnexpectedInstructionReply {
691 mailbox_message: String,
692 reason: String,
693 #[snafu(implicit)]
694 location: Location,
695 },
696
697 #[snafu(display("Expected to retry later, reason: {}", reason))]
698 RetryLater {
699 reason: String,
700 #[snafu(implicit)]
701 location: Location,
702 },
703
704 #[snafu(display("Expected to retry later, reason: {}", reason))]
705 RetryLaterWithSource {
706 reason: String,
707 #[snafu(implicit)]
708 location: Location,
709 source: BoxedError,
710 },
711
712 #[snafu(display("Failed to convert proto data"))]
713 ConvertProtoData {
714 #[snafu(implicit)]
715 location: Location,
716 source: common_meta::error::Error,
717 },
718
719 #[snafu(display("Other error"))]
722 Other {
723 source: BoxedError,
724 #[snafu(implicit)]
725 location: Location,
726 },
727
728 #[snafu(display("Table metadata manager error"))]
729 TableMetadataManager {
730 source: common_meta::error::Error,
731 #[snafu(implicit)]
732 location: Location,
733 },
734
735 #[snafu(display("Runtime switch manager error"))]
736 RuntimeSwitchManager {
737 source: common_meta::error::Error,
738 #[snafu(implicit)]
739 location: Location,
740 },
741
742 #[snafu(display("Keyvalue backend error"))]
743 KvBackend {
744 source: common_meta::error::Error,
745 #[snafu(implicit)]
746 location: Location,
747 },
748
749 #[snafu(display("Failed to publish message"))]
750 PublishMessage {
751 #[snafu(source)]
752 error: SendError<Message>,
753 #[snafu(implicit)]
754 location: Location,
755 },
756
757 #[snafu(display("Too many partitions"))]
758 TooManyPartitions {
759 #[snafu(implicit)]
760 location: Location,
761 },
762
763 #[snafu(display("Unsupported operation {}", operation))]
764 Unsupported {
765 operation: String,
766 #[snafu(implicit)]
767 location: Location,
768 },
769
770 #[snafu(display("Unexpected table route type: {}", err_msg))]
771 UnexpectedLogicalRouteTable {
772 #[snafu(implicit)]
773 location: Location,
774 err_msg: String,
775 source: common_meta::error::Error,
776 },
777
778 #[snafu(display("Failed to save cluster info"))]
779 SaveClusterInfo {
780 #[snafu(implicit)]
781 location: Location,
782 source: common_meta::error::Error,
783 },
784
785 #[snafu(display("Invalid cluster info format"))]
786 InvalidClusterInfoFormat {
787 #[snafu(implicit)]
788 location: Location,
789 source: common_meta::error::Error,
790 },
791
792 #[snafu(display("Invalid datanode stat format"))]
793 InvalidDatanodeStatFormat {
794 #[snafu(implicit)]
795 location: Location,
796 source: common_meta::error::Error,
797 },
798
799 #[snafu(display("Failed to serialize options to TOML"))]
800 TomlFormat {
801 #[snafu(implicit)]
802 location: Location,
803 #[snafu(source(from(common_config::error::Error, Box::new)))]
804 source: Box<common_config::error::Error>,
805 },
806
807 #[cfg(feature = "pg_kvbackend")]
808 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
809 PostgresExecution {
810 #[snafu(source)]
811 error: tokio_postgres::Error,
812 sql: String,
813 #[snafu(implicit)]
814 location: Location,
815 },
816
817 #[cfg(feature = "pg_kvbackend")]
818 #[snafu(display("Failed to get Postgres client"))]
819 GetPostgresClient {
820 #[snafu(implicit)]
821 location: Location,
822 #[snafu(source)]
823 error: deadpool::managed::PoolError<tokio_postgres::Error>,
824 },
825
826 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
827 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
828 SqlExecutionTimeout {
829 #[snafu(implicit)]
830 location: Location,
831 sql: String,
832 duration: std::time::Duration,
833 },
834
835 #[cfg(feature = "pg_kvbackend")]
836 #[snafu(display("Failed to create connection pool for Postgres"))]
837 CreatePostgresPool {
838 #[snafu(source)]
839 error: deadpool_postgres::CreatePoolError,
840 #[snafu(implicit)]
841 location: Location,
842 },
843
844 #[cfg(feature = "pg_kvbackend")]
845 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
846 GetPostgresConnection {
847 reason: String,
848 #[snafu(implicit)]
849 location: Location,
850 },
851
852 #[cfg(feature = "mysql_kvbackend")]
853 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
854 MySqlExecution {
855 #[snafu(source)]
856 error: sqlx::Error,
857 #[snafu(implicit)]
858 location: Location,
859 sql: String,
860 },
861
862 #[cfg(feature = "mysql_kvbackend")]
863 #[snafu(display("Failed to create mysql pool"))]
864 CreateMySqlPool {
865 #[snafu(source)]
866 error: sqlx::Error,
867 #[snafu(implicit)]
868 location: Location,
869 },
870
871 #[cfg(feature = "mysql_kvbackend")]
872 #[snafu(display("Failed to acquire mysql client from pool"))]
873 AcquireMySqlClient {
874 #[snafu(source)]
875 error: sqlx::Error,
876 #[snafu(implicit)]
877 location: Location,
878 },
879
880 #[snafu(display("Handler not found: {}", name))]
881 HandlerNotFound {
882 name: String,
883 #[snafu(implicit)]
884 location: Location,
885 },
886
887 #[snafu(display("Flow state handler error"))]
888 FlowStateHandler {
889 #[snafu(implicit)]
890 location: Location,
891 source: common_meta::error::Error,
892 },
893
894 #[snafu(display("Failed to build wal options allocator"))]
895 BuildWalOptionsAllocator {
896 #[snafu(implicit)]
897 location: Location,
898 source: common_meta::error::Error,
899 },
900
901 #[snafu(display("Failed to parse wal options"))]
902 ParseWalOptions {
903 #[snafu(implicit)]
904 location: Location,
905 source: common_meta::error::Error,
906 },
907
908 #[snafu(display("Failed to build kafka client."))]
909 BuildKafkaClient {
910 #[snafu(implicit)]
911 location: Location,
912 #[snafu(source)]
913 error: common_meta::error::Error,
914 },
915
916 #[snafu(display(
917 "Failed to build a Kafka partition client, topic: {}, partition: {}",
918 topic,
919 partition
920 ))]
921 BuildPartitionClient {
922 topic: String,
923 partition: i32,
924 #[snafu(implicit)]
925 location: Location,
926 #[snafu(source)]
927 error: rskafka::client::error::Error,
928 },
929
930 #[snafu(display(
931 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
932 topic,
933 partition,
934 offset
935 ))]
936 DeleteRecords {
937 #[snafu(implicit)]
938 location: Location,
939 #[snafu(source)]
940 error: rskafka::client::error::Error,
941 topic: String,
942 partition: i32,
943 offset: u64,
944 },
945
946 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
947 GetOffset {
948 topic: String,
949 #[snafu(implicit)]
950 location: Location,
951 #[snafu(source)]
952 error: rskafka::client::error::Error,
953 },
954
955 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
956 UpdateTopicNameValue {
957 topic: String,
958 #[snafu(implicit)]
959 location: Location,
960 #[snafu(source)]
961 source: common_meta::error::Error,
962 },
963}
964
965impl Error {
966 pub fn is_retryable(&self) -> bool {
968 matches!(self, Error::RetryLater { .. })
969 || matches!(self, Error::RetryLaterWithSource { .. })
970 }
971}
972
973pub type Result<T> = std::result::Result<T, Error>;
974
975define_into_tonic_status!(Error);
976
977impl ErrorExt for Error {
978 fn status_code(&self) -> StatusCode {
979 match self {
980 Error::EtcdFailed { .. }
981 | Error::ConnectEtcd { .. }
982 | Error::FileIo { .. }
983 | Error::TcpBind { .. }
984 | Error::SerializeToJson { .. }
985 | Error::DeserializeFromJson { .. }
986 | Error::NoLeader { .. }
987 | Error::LeaderLeaseExpired { .. }
988 | Error::LeaderLeaseChanged { .. }
989 | Error::CreateChannel { .. }
990 | Error::BatchGet { .. }
991 | Error::Range { .. }
992 | Error::ResponseHeaderNotFound { .. }
993 | Error::InvalidHttpBody { .. }
994 | Error::ExceededRetryLimit { .. }
995 | Error::SendShutdownSignal { .. }
996 | Error::PushMessage { .. }
997 | Error::MailboxClosed { .. }
998 | Error::MailboxReceiver { .. }
999 | Error::StartGrpc { .. }
1000 | Error::PublishMessage { .. }
1001 | Error::Join { .. }
1002 | Error::ChooseItems { .. }
1003 | Error::FlowStateHandler { .. }
1004 | Error::BuildWalOptionsAllocator { .. }
1005 | Error::BuildPartitionClient { .. }
1006 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1007
1008 Error::DeleteRecords { .. }
1009 | Error::GetOffset { .. }
1010 | Error::PeerUnavailable { .. }
1011 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1012 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1013 Error::PruneTaskAlreadyRunning { .. }
1014 | Error::RetryLater { .. }
1015 | Error::MailboxChannelClosed { .. }
1016 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1017 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1018
1019 Error::Unsupported { .. } => StatusCode::Unsupported,
1020
1021 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1022
1023 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1024 Error::EmptyKey { .. }
1025 | Error::MissingRequiredParameter { .. }
1026 | Error::MissingRequestHeader { .. }
1027 | Error::InvalidLeaseKey { .. }
1028 | Error::InvalidStatKey { .. }
1029 | Error::InvalidInactiveRegionKey { .. }
1030 | Error::ParseNum { .. }
1031 | Error::ParseBool { .. }
1032 | Error::ParseAddr { .. }
1033 | Error::UnsupportedSelectorType { .. }
1034 | Error::InvalidArguments { .. }
1035 | Error::InitExportMetricsTask { .. }
1036 | Error::ProcedureNotFound { .. }
1037 | Error::TooManyPartitions { .. }
1038 | Error::TomlFormat { .. }
1039 | Error::HandlerNotFound { .. }
1040 | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1041 Error::LeaseKeyFromUtf8 { .. }
1042 | Error::LeaseValueFromUtf8 { .. }
1043 | Error::InvalidRegionKeyFromUtf8 { .. }
1044 | Error::TableRouteNotFound { .. }
1045 | Error::TableInfoNotFound { .. }
1046 | Error::DatanodeTableNotFound { .. }
1047 | Error::InvalidUtf8Value { .. }
1048 | Error::UnexpectedInstructionReply { .. }
1049 | Error::Unexpected { .. }
1050 | Error::RegionOpeningRace { .. }
1051 | Error::RegionRouteNotFound { .. }
1052 | Error::MigrationAbort { .. }
1053 | Error::MigrationRunning { .. }
1054 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1055 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1056 Error::SaveClusterInfo { source, .. }
1057 | Error::InvalidClusterInfoFormat { source, .. }
1058 | Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(),
1059 Error::InvalidateTableCache { source, .. } => source.status_code(),
1060 Error::SubmitProcedure { source, .. }
1061 | Error::WaitProcedure { source, .. }
1062 | Error::QueryProcedure { source, .. } => source.status_code(),
1063 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1064 source.status_code()
1065 }
1066 Error::StartProcedureManager { source, .. }
1067 | Error::StopProcedureManager { source, .. } => source.status_code(),
1068
1069 Error::ListCatalogs { source, .. }
1070 | Error::ListSchemas { source, .. }
1071 | Error::ListTables { source, .. } => source.status_code(),
1072 Error::StartTelemetryTask { source, .. } => source.status_code(),
1073
1074 Error::NextSequence { source, .. }
1075 | Error::SetNextSequence { source, .. }
1076 | Error::PeekSequence { source, .. } => source.status_code(),
1077 Error::DowngradeLeader { source, .. } => source.status_code(),
1078 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1079 Error::SubmitDdlTask { source, .. }
1080 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1081 Error::ConvertProtoData { source, .. }
1082 | Error::TableMetadataManager { source, .. }
1083 | Error::RuntimeSwitchManager { source, .. }
1084 | Error::KvBackend { source, .. }
1085 | Error::UnexpectedLogicalRouteTable { source, .. }
1086 | Error::UpdateTopicNameValue { source, .. }
1087 | Error::ParseWalOptions { source, .. } => source.status_code(),
1088 Error::LookupFrontends { source, .. } => source.status_code(),
1089 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1090
1091 Error::InitMetadata { source, .. }
1092 | Error::InitDdlManager { source, .. }
1093 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1094
1095 Error::Other { source, .. } => source.status_code(),
1096 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1097
1098 #[cfg(feature = "pg_kvbackend")]
1099 Error::CreatePostgresPool { .. }
1100 | Error::GetPostgresClient { .. }
1101 | Error::GetPostgresConnection { .. }
1102 | Error::PostgresExecution { .. } => StatusCode::Internal,
1103 #[cfg(feature = "mysql_kvbackend")]
1104 Error::MySqlExecution { .. }
1105 | Error::CreateMySqlPool { .. }
1106 | Error::ParseMySqlUrl { .. }
1107 | Error::DecodeSqlValue { .. }
1108 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1109 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1110 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1111 }
1112 }
1113
1114 fn as_any(&self) -> &dyn std::any::Any {
1115 self
1116 }
1117}
1118
1119pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1121 let mut err: &(dyn std::error::Error + 'static) = err_status;
1122
1123 loop {
1124 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1125 return Some(io_err);
1126 }
1127
1128 if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
1131 if let Some(io_err) = h2_err.get_io() {
1132 return Some(io_err);
1133 }
1134 }
1135
1136 err = err.source()?;
1137 }
1138}