1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Failed to choose items"))]
36 ChooseItems {
37 #[snafu(implicit)]
38 location: Location,
39 #[snafu(source)]
40 error: rand::distr::weighted::Error,
41 },
42
43 #[snafu(display("Exceeded deadline, operation: {}", operation))]
44 ExceededDeadline {
45 #[snafu(implicit)]
46 location: Location,
47 operation: String,
48 },
49
50 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51 PeerUnavailable {
52 #[snafu(implicit)]
53 location: Location,
54 peer_id: u64,
55 },
56
57 #[snafu(display("Failed to list active frontends"))]
58 ListActiveFrontends {
59 #[snafu(implicit)]
60 location: Location,
61 source: common_meta::error::Error,
62 },
63
64 #[snafu(display("Failed to list active datanodes"))]
65 ListActiveDatanodes {
66 #[snafu(implicit)]
67 location: Location,
68 source: common_meta::error::Error,
69 },
70
71 #[snafu(display("Failed to list active flownodes"))]
72 ListActiveFlownodes {
73 #[snafu(implicit)]
74 location: Location,
75 source: common_meta::error::Error,
76 },
77
78 #[snafu(display("No available frontend"))]
79 NoAvailableFrontend {
80 #[snafu(implicit)]
81 location: Location,
82 },
83
84 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
85 MigrationRunning {
86 #[snafu(implicit)]
87 location: Location,
88 region_id: RegionId,
89 },
90
91 #[snafu(display(
92 "The region migration procedure is completed for region: {}, target_peer: {}",
93 region_id,
94 target_peer_id
95 ))]
96 RegionMigrated {
97 #[snafu(implicit)]
98 location: Location,
99 region_id: RegionId,
100 target_peer_id: u64,
101 },
102
103 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
104 MigrationAbort {
105 #[snafu(implicit)]
106 location: Location,
107 reason: String,
108 },
109
110 #[snafu(display(
111 "Another procedure is opening the region: {} on peer: {}",
112 region_id,
113 peer_id
114 ))]
115 RegionOpeningRace {
116 #[snafu(implicit)]
117 location: Location,
118 peer_id: DatanodeId,
119 region_id: RegionId,
120 },
121
122 #[snafu(display("Failed to init ddl manager"))]
123 InitDdlManager {
124 #[snafu(implicit)]
125 location: Location,
126 source: common_meta::error::Error,
127 },
128
129 #[snafu(display("Failed to init reconciliation manager"))]
130 InitReconciliationManager {
131 #[snafu(implicit)]
132 location: Location,
133 source: common_meta::error::Error,
134 },
135
136 #[snafu(display("Failed to create default catalog and schema"))]
137 InitMetadata {
138 #[snafu(implicit)]
139 location: Location,
140 source: common_meta::error::Error,
141 },
142
143 #[snafu(display("Failed to allocate next sequence number"))]
144 NextSequence {
145 #[snafu(implicit)]
146 location: Location,
147 source: common_meta::error::Error,
148 },
149
150 #[snafu(display("Failed to set next sequence number"))]
151 SetNextSequence {
152 #[snafu(implicit)]
153 location: Location,
154 source: common_meta::error::Error,
155 },
156
157 #[snafu(display("Failed to peek sequence number"))]
158 PeekSequence {
159 #[snafu(implicit)]
160 location: Location,
161 source: common_meta::error::Error,
162 },
163
164 #[snafu(display("Failed to start telemetry task"))]
165 StartTelemetryTask {
166 #[snafu(implicit)]
167 location: Location,
168 source: common_runtime::error::Error,
169 },
170
171 #[snafu(display("Failed to submit ddl task"))]
172 SubmitDdlTask {
173 #[snafu(implicit)]
174 location: Location,
175 source: common_meta::error::Error,
176 },
177
178 #[snafu(display("Failed to submit reconcile procedure"))]
179 SubmitReconcileProcedure {
180 #[snafu(implicit)]
181 location: Location,
182 source: common_meta::error::Error,
183 },
184
185 #[snafu(display("Failed to invalidate table cache"))]
186 InvalidateTableCache {
187 #[snafu(implicit)]
188 location: Location,
189 source: common_meta::error::Error,
190 },
191
192 #[snafu(display("Failed to list catalogs"))]
193 ListCatalogs {
194 #[snafu(implicit)]
195 location: Location,
196 source: BoxedError,
197 },
198
199 #[snafu(display("Failed to list {}'s schemas", catalog))]
200 ListSchemas {
201 #[snafu(implicit)]
202 location: Location,
203 catalog: String,
204 source: BoxedError,
205 },
206
207 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
208 ListTables {
209 #[snafu(implicit)]
210 location: Location,
211 catalog: String,
212 schema: String,
213 source: BoxedError,
214 },
215
216 #[snafu(display("Failed to join a future"))]
217 Join {
218 #[snafu(implicit)]
219 location: Location,
220 #[snafu(source)]
221 error: JoinError,
222 },
223
224 #[snafu(display(
225 "Failed to request {}, required: {}, but only {} available",
226 select_target,
227 required,
228 available
229 ))]
230 NoEnoughAvailableNode {
231 #[snafu(implicit)]
232 location: Location,
233 required: usize,
234 available: usize,
235 select_target: SelectTarget,
236 },
237
238 #[snafu(display("Failed to send shutdown signal"))]
239 SendShutdownSignal {
240 #[snafu(source)]
241 error: SendError<()>,
242 },
243
244 #[snafu(display("Failed to shutdown {} server", server))]
245 ShutdownServer {
246 #[snafu(implicit)]
247 location: Location,
248 source: servers::error::Error,
249 server: String,
250 },
251
252 #[snafu(display("Empty key is not allowed"))]
253 EmptyKey {
254 #[snafu(implicit)]
255 location: Location,
256 },
257
258 #[snafu(display("Failed to execute via Etcd"))]
259 EtcdFailed {
260 #[snafu(source)]
261 error: etcd_client::Error,
262 #[snafu(implicit)]
263 location: Location,
264 },
265
266 #[snafu(display("Failed to connect to Etcd"))]
267 ConnectEtcd {
268 #[snafu(source)]
269 error: etcd_client::Error,
270 #[snafu(implicit)]
271 location: Location,
272 },
273
274 #[snafu(display("Failed to read file: {}", path))]
275 FileIo {
276 #[snafu(source)]
277 error: std::io::Error,
278 #[snafu(implicit)]
279 location: Location,
280 path: String,
281 },
282
283 #[snafu(display("Failed to bind address {}", addr))]
284 TcpBind {
285 addr: String,
286 #[snafu(source)]
287 error: std::io::Error,
288 #[snafu(implicit)]
289 location: Location,
290 },
291
292 #[snafu(display("Failed to start gRPC server"))]
293 StartGrpc {
294 #[snafu(source)]
295 error: tonic::transport::Error,
296 #[snafu(implicit)]
297 location: Location,
298 },
299
300 #[snafu(display("Failed to start http server"))]
301 StartHttp {
302 #[snafu(implicit)]
303 location: Location,
304 source: servers::error::Error,
305 },
306
307 #[snafu(display("Failed to init export metrics task"))]
308 InitExportMetricsTask {
309 #[snafu(implicit)]
310 location: Location,
311 source: servers::error::Error,
312 },
313
314 #[snafu(display("Failed to parse address {}", addr))]
315 ParseAddr {
316 addr: String,
317 #[snafu(source)]
318 error: std::net::AddrParseError,
319 },
320
321 #[snafu(display("Invalid lease key: {}", key))]
322 InvalidLeaseKey {
323 key: String,
324 #[snafu(implicit)]
325 location: Location,
326 },
327
328 #[snafu(display("Invalid datanode stat key: {}", key))]
329 InvalidStatKey {
330 key: String,
331 #[snafu(implicit)]
332 location: Location,
333 },
334
335 #[snafu(display("Invalid inactive region key: {}", key))]
336 InvalidInactiveRegionKey {
337 key: String,
338 #[snafu(implicit)]
339 location: Location,
340 },
341
342 #[snafu(display("Failed to parse lease key from utf8"))]
343 LeaseKeyFromUtf8 {
344 #[snafu(source)]
345 error: std::string::FromUtf8Error,
346 #[snafu(implicit)]
347 location: Location,
348 },
349
350 #[snafu(display("Failed to parse lease value from utf8"))]
351 LeaseValueFromUtf8 {
352 #[snafu(source)]
353 error: std::string::FromUtf8Error,
354 #[snafu(implicit)]
355 location: Location,
356 },
357
358 #[snafu(display("Failed to parse invalid region key from utf8"))]
359 InvalidRegionKeyFromUtf8 {
360 #[snafu(source)]
361 error: std::string::FromUtf8Error,
362 #[snafu(implicit)]
363 location: Location,
364 },
365
366 #[snafu(display("Failed to serialize to json: {}", input))]
367 SerializeToJson {
368 input: String,
369 #[snafu(source)]
370 error: serde_json::error::Error,
371 #[snafu(implicit)]
372 location: Location,
373 },
374
375 #[snafu(display("Failed to deserialize from json: {}", input))]
376 DeserializeFromJson {
377 input: String,
378 #[snafu(source)]
379 error: serde_json::error::Error,
380 #[snafu(implicit)]
381 location: Location,
382 },
383
384 #[snafu(display("Failed to parse number: {}", err_msg))]
385 ParseNum {
386 err_msg: String,
387 #[snafu(source)]
388 error: std::num::ParseIntError,
389 #[snafu(implicit)]
390 location: Location,
391 },
392
393 #[snafu(display("Failed to parse bool: {}", err_msg))]
394 ParseBool {
395 err_msg: String,
396 #[snafu(source)]
397 error: std::str::ParseBoolError,
398 #[snafu(implicit)]
399 location: Location,
400 },
401
402 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
403 DowngradeLeader {
404 region_id: RegionId,
405 #[snafu(implicit)]
406 location: Location,
407 #[snafu(source)]
408 source: BoxedError,
409 },
410
411 #[snafu(display("Region's leader peer changed: {}", msg))]
412 LeaderPeerChanged {
413 msg: String,
414 #[snafu(implicit)]
415 location: Location,
416 },
417
418 #[snafu(display("Invalid arguments: {}", err_msg))]
419 InvalidArguments {
420 err_msg: String,
421 #[snafu(implicit)]
422 location: Location,
423 },
424
425 #[cfg(feature = "mysql_kvbackend")]
426 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
427 ParseMySqlUrl {
428 #[snafu(source)]
429 error: sqlx::error::Error,
430 mysql_url: String,
431 #[snafu(implicit)]
432 location: Location,
433 },
434
435 #[cfg(feature = "mysql_kvbackend")]
436 #[snafu(display("Failed to decode sql value"))]
437 DecodeSqlValue {
438 #[snafu(source)]
439 error: sqlx::error::Error,
440 #[snafu(implicit)]
441 location: Location,
442 },
443
444 #[snafu(display("Failed to find table route for {table_id}"))]
445 TableRouteNotFound {
446 table_id: TableId,
447 #[snafu(implicit)]
448 location: Location,
449 },
450
451 #[snafu(display("Failed to find table route for {region_id}"))]
452 RegionRouteNotFound {
453 region_id: RegionId,
454 #[snafu(implicit)]
455 location: Location,
456 },
457
458 #[snafu(display("Table info not found: {}", table_id))]
459 TableInfoNotFound {
460 table_id: TableId,
461 #[snafu(implicit)]
462 location: Location,
463 },
464
465 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
466 DatanodeTableNotFound {
467 table_id: TableId,
468 datanode_id: DatanodeId,
469 #[snafu(implicit)]
470 location: Location,
471 },
472
473 #[snafu(display("Metasrv has no leader at this moment"))]
474 NoLeader {
475 #[snafu(implicit)]
476 location: Location,
477 },
478
479 #[snafu(display("Leader lease expired"))]
480 LeaderLeaseExpired {
481 #[snafu(implicit)]
482 location: Location,
483 },
484
485 #[snafu(display("Leader lease changed during election"))]
486 LeaderLeaseChanged {
487 #[snafu(implicit)]
488 location: Location,
489 },
490
491 #[snafu(display("Table {} not found", name))]
492 TableNotFound {
493 name: String,
494 #[snafu(implicit)]
495 location: Location,
496 },
497
498 #[snafu(display("Unsupported selector type, {}", selector_type))]
499 UnsupportedSelectorType {
500 selector_type: String,
501 #[snafu(implicit)]
502 location: Location,
503 },
504
505 #[snafu(display("Unexpected, violated: {violated}"))]
506 Unexpected {
507 violated: String,
508 #[snafu(implicit)]
509 location: Location,
510 },
511
512 #[snafu(display("Failed to create gRPC channel"))]
513 CreateChannel {
514 #[snafu(implicit)]
515 location: Location,
516 source: common_grpc::error::Error,
517 },
518
519 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
520 BatchGet {
521 #[snafu(source)]
522 error: tonic::Status,
523 #[snafu(implicit)]
524 location: Location,
525 },
526
527 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
528 Range {
529 #[snafu(source)]
530 error: tonic::Status,
531 #[snafu(implicit)]
532 location: Location,
533 },
534
535 #[snafu(display("Response header not found"))]
536 ResponseHeaderNotFound {
537 #[snafu(implicit)]
538 location: Location,
539 },
540
541 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
542 IsNotLeader {
543 node_addr: String,
544 #[snafu(implicit)]
545 location: Location,
546 },
547
548 #[snafu(display("Invalid http body"))]
549 InvalidHttpBody {
550 #[snafu(source)]
551 error: http::Error,
552 #[snafu(implicit)]
553 location: Location,
554 },
555
556 #[snafu(display(
557 "The number of retries for the grpc call {} exceeded the limit, {}",
558 func_name,
559 retry_num
560 ))]
561 ExceededRetryLimit {
562 func_name: String,
563 retry_num: usize,
564 #[snafu(implicit)]
565 location: Location,
566 },
567
568 #[snafu(display("Invalid utf-8 value"))]
569 InvalidUtf8Value {
570 #[snafu(source)]
571 error: std::string::FromUtf8Error,
572 #[snafu(implicit)]
573 location: Location,
574 },
575
576 #[snafu(display("Missing required parameter, param: {:?}", param))]
577 MissingRequiredParameter { param: String },
578
579 #[snafu(display("Failed to start procedure manager"))]
580 StartProcedureManager {
581 #[snafu(implicit)]
582 location: Location,
583 source: common_procedure::Error,
584 },
585
586 #[snafu(display("Failed to stop procedure manager"))]
587 StopProcedureManager {
588 #[snafu(implicit)]
589 location: Location,
590 source: common_procedure::Error,
591 },
592
593 #[snafu(display("Failed to wait procedure done"))]
594 WaitProcedure {
595 #[snafu(implicit)]
596 location: Location,
597 source: common_procedure::Error,
598 },
599
600 #[snafu(display("Failed to query procedure state"))]
601 QueryProcedure {
602 #[snafu(implicit)]
603 location: Location,
604 source: common_procedure::Error,
605 },
606
607 #[snafu(display("Procedure not found: {pid}"))]
608 ProcedureNotFound {
609 #[snafu(implicit)]
610 location: Location,
611 pid: String,
612 },
613
614 #[snafu(display("Failed to submit procedure"))]
615 SubmitProcedure {
616 #[snafu(implicit)]
617 location: Location,
618 source: common_procedure::Error,
619 },
620
621 #[snafu(display("A prune task for topic {} is already running", topic))]
622 PruneTaskAlreadyRunning {
623 topic: String,
624 #[snafu(implicit)]
625 location: Location,
626 },
627
628 #[snafu(display("Schema already exists, name: {schema_name}"))]
629 SchemaAlreadyExists {
630 schema_name: String,
631 #[snafu(implicit)]
632 location: Location,
633 },
634
635 #[snafu(display("Table already exists: {table_name}"))]
636 TableAlreadyExists {
637 table_name: String,
638 #[snafu(implicit)]
639 location: Location,
640 },
641
642 #[snafu(display("Pusher not found: {pusher_id}"))]
643 PusherNotFound {
644 pusher_id: String,
645 #[snafu(implicit)]
646 location: Location,
647 },
648
649 #[snafu(display("Failed to push message: {err_msg}"))]
650 PushMessage {
651 err_msg: String,
652 #[snafu(implicit)]
653 location: Location,
654 },
655
656 #[snafu(display("Mailbox already closed: {id}"))]
657 MailboxClosed {
658 id: u64,
659 #[snafu(implicit)]
660 location: Location,
661 },
662
663 #[snafu(display("Mailbox timeout: {id}"))]
664 MailboxTimeout {
665 id: u64,
666 #[snafu(implicit)]
667 location: Location,
668 },
669
670 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
671 MailboxReceiver {
672 id: u64,
673 err_msg: String,
674 #[snafu(implicit)]
675 location: Location,
676 },
677
678 #[snafu(display("Mailbox channel closed: {channel}"))]
679 MailboxChannelClosed {
680 channel: Channel,
681 #[snafu(implicit)]
682 location: Location,
683 },
684
685 #[snafu(display("Missing request header"))]
686 MissingRequestHeader {
687 #[snafu(implicit)]
688 location: Location,
689 },
690
691 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
692 RegisterProcedureLoader {
693 type_name: String,
694 #[snafu(implicit)]
695 location: Location,
696 source: common_procedure::error::Error,
697 },
698
699 #[snafu(display(
700 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
701 mailbox_message,
702 reason
703 ))]
704 UnexpectedInstructionReply {
705 mailbox_message: String,
706 reason: String,
707 #[snafu(implicit)]
708 location: Location,
709 },
710
711 #[snafu(display("Expected to retry later, reason: {}", reason))]
712 RetryLater {
713 reason: String,
714 #[snafu(implicit)]
715 location: Location,
716 },
717
718 #[snafu(display("Expected to retry later, reason: {}", reason))]
719 RetryLaterWithSource {
720 reason: String,
721 #[snafu(implicit)]
722 location: Location,
723 source: BoxedError,
724 },
725
726 #[snafu(display("Failed to convert proto data"))]
727 ConvertProtoData {
728 #[snafu(implicit)]
729 location: Location,
730 source: common_meta::error::Error,
731 },
732
733 #[snafu(display("Other error"))]
736 Other {
737 source: BoxedError,
738 #[snafu(implicit)]
739 location: Location,
740 },
741
742 #[snafu(display("Table metadata manager error"))]
743 TableMetadataManager {
744 source: common_meta::error::Error,
745 #[snafu(implicit)]
746 location: Location,
747 },
748
749 #[snafu(display("Runtime switch manager error"))]
750 RuntimeSwitchManager {
751 source: common_meta::error::Error,
752 #[snafu(implicit)]
753 location: Location,
754 },
755
756 #[snafu(display("Keyvalue backend error"))]
757 KvBackend {
758 source: common_meta::error::Error,
759 #[snafu(implicit)]
760 location: Location,
761 },
762
763 #[snafu(display("Failed to publish message"))]
764 PublishMessage {
765 #[snafu(source)]
766 error: SendError<Message>,
767 #[snafu(implicit)]
768 location: Location,
769 },
770
771 #[snafu(display("Too many partitions"))]
772 TooManyPartitions {
773 #[snafu(implicit)]
774 location: Location,
775 },
776
777 #[snafu(display("Unsupported operation {}", operation))]
778 Unsupported {
779 operation: String,
780 #[snafu(implicit)]
781 location: Location,
782 },
783
784 #[snafu(display("Unexpected table route type: {}", err_msg))]
785 UnexpectedLogicalRouteTable {
786 #[snafu(implicit)]
787 location: Location,
788 err_msg: String,
789 source: common_meta::error::Error,
790 },
791
792 #[snafu(display("Failed to save cluster info"))]
793 SaveClusterInfo {
794 #[snafu(implicit)]
795 location: Location,
796 source: common_meta::error::Error,
797 },
798
799 #[snafu(display("Invalid cluster info format"))]
800 InvalidClusterInfoFormat {
801 #[snafu(implicit)]
802 location: Location,
803 source: common_meta::error::Error,
804 },
805
806 #[snafu(display("Invalid datanode stat format"))]
807 InvalidDatanodeStatFormat {
808 #[snafu(implicit)]
809 location: Location,
810 source: common_meta::error::Error,
811 },
812
813 #[snafu(display("Invalid node info format"))]
814 InvalidNodeInfoFormat {
815 #[snafu(implicit)]
816 location: Location,
817 source: common_meta::error::Error,
818 },
819
820 #[snafu(display("Failed to serialize options to TOML"))]
821 TomlFormat {
822 #[snafu(implicit)]
823 location: Location,
824 #[snafu(source(from(common_config::error::Error, Box::new)))]
825 source: Box<common_config::error::Error>,
826 },
827
828 #[cfg(feature = "pg_kvbackend")]
829 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
830 PostgresExecution {
831 #[snafu(source)]
832 error: tokio_postgres::Error,
833 sql: String,
834 #[snafu(implicit)]
835 location: Location,
836 },
837
838 #[cfg(feature = "pg_kvbackend")]
839 #[snafu(display("Failed to get Postgres client"))]
840 GetPostgresClient {
841 #[snafu(implicit)]
842 location: Location,
843 #[snafu(source)]
844 error: deadpool::managed::PoolError<tokio_postgres::Error>,
845 },
846
847 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
848 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
849 SqlExecutionTimeout {
850 #[snafu(implicit)]
851 location: Location,
852 sql: String,
853 duration: std::time::Duration,
854 },
855
856 #[cfg(feature = "pg_kvbackend")]
857 #[snafu(display("Failed to create connection pool for Postgres"))]
858 CreatePostgresPool {
859 #[snafu(source)]
860 error: deadpool_postgres::CreatePoolError,
861 #[snafu(implicit)]
862 location: Location,
863 },
864
865 #[cfg(feature = "pg_kvbackend")]
866 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
867 GetPostgresConnection {
868 reason: String,
869 #[snafu(implicit)]
870 location: Location,
871 },
872
873 #[cfg(feature = "mysql_kvbackend")]
874 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
875 MySqlExecution {
876 #[snafu(source)]
877 error: sqlx::Error,
878 #[snafu(implicit)]
879 location: Location,
880 sql: String,
881 },
882
883 #[cfg(feature = "mysql_kvbackend")]
884 #[snafu(display("Failed to create mysql pool"))]
885 CreateMySqlPool {
886 #[snafu(source)]
887 error: sqlx::Error,
888 #[snafu(implicit)]
889 location: Location,
890 },
891
892 #[cfg(feature = "mysql_kvbackend")]
893 #[snafu(display("Failed to acquire mysql client from pool"))]
894 AcquireMySqlClient {
895 #[snafu(source)]
896 error: sqlx::Error,
897 #[snafu(implicit)]
898 location: Location,
899 },
900
901 #[snafu(display("Handler not found: {}", name))]
902 HandlerNotFound {
903 name: String,
904 #[snafu(implicit)]
905 location: Location,
906 },
907
908 #[snafu(display("Flow state handler error"))]
909 FlowStateHandler {
910 #[snafu(implicit)]
911 location: Location,
912 source: common_meta::error::Error,
913 },
914
915 #[snafu(display("Failed to build wal options allocator"))]
916 BuildWalOptionsAllocator {
917 #[snafu(implicit)]
918 location: Location,
919 source: common_meta::error::Error,
920 },
921
922 #[snafu(display("Failed to parse wal options"))]
923 ParseWalOptions {
924 #[snafu(implicit)]
925 location: Location,
926 source: common_meta::error::Error,
927 },
928
929 #[snafu(display("Failed to build kafka client."))]
930 BuildKafkaClient {
931 #[snafu(implicit)]
932 location: Location,
933 #[snafu(source)]
934 error: common_meta::error::Error,
935 },
936
937 #[snafu(display(
938 "Failed to build a Kafka partition client, topic: {}, partition: {}",
939 topic,
940 partition
941 ))]
942 BuildPartitionClient {
943 topic: String,
944 partition: i32,
945 #[snafu(implicit)]
946 location: Location,
947 #[snafu(source)]
948 error: rskafka::client::error::Error,
949 },
950
951 #[snafu(display(
952 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
953 topic,
954 partition,
955 offset
956 ))]
957 DeleteRecords {
958 #[snafu(implicit)]
959 location: Location,
960 #[snafu(source)]
961 error: rskafka::client::error::Error,
962 topic: String,
963 partition: i32,
964 offset: u64,
965 },
966
967 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
968 GetOffset {
969 topic: String,
970 #[snafu(implicit)]
971 location: Location,
972 #[snafu(source)]
973 error: rskafka::client::error::Error,
974 },
975
976 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
977 UpdateTopicNameValue {
978 topic: String,
979 #[snafu(implicit)]
980 location: Location,
981 #[snafu(source)]
982 source: common_meta::error::Error,
983 },
984
985 #[snafu(display("Failed to build tls options"))]
986 BuildTlsOptions {
987 #[snafu(implicit)]
988 location: Location,
989 #[snafu(source)]
990 source: common_meta::error::Error,
991 },
992}
993
994impl Error {
995 pub fn is_retryable(&self) -> bool {
997 matches!(self, Error::RetryLater { .. })
998 || matches!(self, Error::RetryLaterWithSource { .. })
999 }
1000}
1001
1002pub type Result<T> = std::result::Result<T, Error>;
1003
1004define_into_tonic_status!(Error);
1005
1006impl ErrorExt for Error {
1007 fn status_code(&self) -> StatusCode {
1008 match self {
1009 Error::EtcdFailed { .. }
1010 | Error::ConnectEtcd { .. }
1011 | Error::FileIo { .. }
1012 | Error::TcpBind { .. }
1013 | Error::SerializeToJson { .. }
1014 | Error::DeserializeFromJson { .. }
1015 | Error::NoLeader { .. }
1016 | Error::LeaderLeaseExpired { .. }
1017 | Error::LeaderLeaseChanged { .. }
1018 | Error::CreateChannel { .. }
1019 | Error::BatchGet { .. }
1020 | Error::Range { .. }
1021 | Error::ResponseHeaderNotFound { .. }
1022 | Error::InvalidHttpBody { .. }
1023 | Error::ExceededRetryLimit { .. }
1024 | Error::SendShutdownSignal { .. }
1025 | Error::PushMessage { .. }
1026 | Error::MailboxClosed { .. }
1027 | Error::MailboxReceiver { .. }
1028 | Error::StartGrpc { .. }
1029 | Error::PublishMessage { .. }
1030 | Error::Join { .. }
1031 | Error::ChooseItems { .. }
1032 | Error::FlowStateHandler { .. }
1033 | Error::BuildWalOptionsAllocator { .. }
1034 | Error::BuildPartitionClient { .. }
1035 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1036
1037 Error::DeleteRecords { .. }
1038 | Error::GetOffset { .. }
1039 | Error::PeerUnavailable { .. }
1040 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1041 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1042 Error::PruneTaskAlreadyRunning { .. }
1043 | Error::RetryLater { .. }
1044 | Error::MailboxChannelClosed { .. }
1045 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1046 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1047
1048 Error::Unsupported { .. } => StatusCode::Unsupported,
1049
1050 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1051
1052 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1053 Error::EmptyKey { .. }
1054 | Error::MissingRequiredParameter { .. }
1055 | Error::MissingRequestHeader { .. }
1056 | Error::InvalidLeaseKey { .. }
1057 | Error::InvalidStatKey { .. }
1058 | Error::InvalidInactiveRegionKey { .. }
1059 | Error::ParseNum { .. }
1060 | Error::ParseBool { .. }
1061 | Error::ParseAddr { .. }
1062 | Error::UnsupportedSelectorType { .. }
1063 | Error::InvalidArguments { .. }
1064 | Error::InitExportMetricsTask { .. }
1065 | Error::ProcedureNotFound { .. }
1066 | Error::TooManyPartitions { .. }
1067 | Error::TomlFormat { .. }
1068 | Error::HandlerNotFound { .. }
1069 | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1070 Error::LeaseKeyFromUtf8 { .. }
1071 | Error::LeaseValueFromUtf8 { .. }
1072 | Error::InvalidRegionKeyFromUtf8 { .. }
1073 | Error::TableRouteNotFound { .. }
1074 | Error::TableInfoNotFound { .. }
1075 | Error::DatanodeTableNotFound { .. }
1076 | Error::InvalidUtf8Value { .. }
1077 | Error::UnexpectedInstructionReply { .. }
1078 | Error::Unexpected { .. }
1079 | Error::RegionOpeningRace { .. }
1080 | Error::RegionRouteNotFound { .. }
1081 | Error::MigrationAbort { .. }
1082 | Error::MigrationRunning { .. }
1083 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1084 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1085 Error::SaveClusterInfo { source, .. }
1086 | Error::InvalidClusterInfoFormat { source, .. }
1087 | Error::InvalidDatanodeStatFormat { source, .. }
1088 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1089 Error::InvalidateTableCache { source, .. } => source.status_code(),
1090 Error::SubmitProcedure { source, .. }
1091 | Error::WaitProcedure { source, .. }
1092 | Error::QueryProcedure { source, .. } => source.status_code(),
1093 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1094 source.status_code()
1095 }
1096 Error::StartProcedureManager { source, .. }
1097 | Error::StopProcedureManager { source, .. } => source.status_code(),
1098
1099 Error::ListCatalogs { source, .. }
1100 | Error::ListSchemas { source, .. }
1101 | Error::ListTables { source, .. } => source.status_code(),
1102 Error::StartTelemetryTask { source, .. } => source.status_code(),
1103
1104 Error::NextSequence { source, .. }
1105 | Error::SetNextSequence { source, .. }
1106 | Error::PeekSequence { source, .. } => source.status_code(),
1107 Error::DowngradeLeader { source, .. } => source.status_code(),
1108 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1109 Error::SubmitDdlTask { source, .. }
1110 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1111 Error::ConvertProtoData { source, .. }
1112 | Error::TableMetadataManager { source, .. }
1113 | Error::RuntimeSwitchManager { source, .. }
1114 | Error::KvBackend { source, .. }
1115 | Error::UnexpectedLogicalRouteTable { source, .. }
1116 | Error::UpdateTopicNameValue { source, .. }
1117 | Error::ParseWalOptions { source, .. } => source.status_code(),
1118 Error::ListActiveFrontends { source, .. }
1119 | Error::ListActiveDatanodes { source, .. }
1120 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1121 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1122
1123 Error::InitMetadata { source, .. }
1124 | Error::InitDdlManager { source, .. }
1125 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1126
1127 Error::BuildTlsOptions { source, .. } => source.status_code(),
1128 Error::Other { source, .. } => source.status_code(),
1129 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1130
1131 #[cfg(feature = "pg_kvbackend")]
1132 Error::CreatePostgresPool { .. }
1133 | Error::GetPostgresClient { .. }
1134 | Error::GetPostgresConnection { .. }
1135 | Error::PostgresExecution { .. } => StatusCode::Internal,
1136 #[cfg(feature = "mysql_kvbackend")]
1137 Error::MySqlExecution { .. }
1138 | Error::CreateMySqlPool { .. }
1139 | Error::ParseMySqlUrl { .. }
1140 | Error::DecodeSqlValue { .. }
1141 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1142 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1143 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1144 }
1145 }
1146
1147 fn as_any(&self) -> &dyn std::any::Any {
1148 self
1149 }
1150}
1151
1152pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1154 let mut err: &(dyn std::error::Error + 'static) = err_status;
1155
1156 loop {
1157 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1158 return Some(io_err);
1159 }
1160
1161 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1164 && let Some(io_err) = h2_err.get_io()
1165 {
1166 return Some(io_err);
1167 }
1168
1169 err = err.source()?;
1170 }
1171}