1use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26use uuid::Uuid;
27
28use crate::metasrv::SelectTarget;
29use crate::pubsub::Message;
30use crate::service::mailbox::Channel;
31
32#[derive(Snafu)]
33#[snafu(visibility(pub))]
34#[stack_trace_debug]
35pub enum Error {
36 #[snafu(display("Failed to choose items"))]
37 ChooseItems {
38 #[snafu(implicit)]
39 location: Location,
40 #[snafu(source)]
41 error: rand::distr::weighted::Error,
42 },
43
44 #[snafu(display("Exceeded deadline, operation: {}", operation))]
45 ExceededDeadline {
46 #[snafu(implicit)]
47 location: Location,
48 operation: String,
49 },
50
51 #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
52 PeerUnavailable {
53 #[snafu(implicit)]
54 location: Location,
55 peer_id: u64,
56 },
57
58 #[snafu(display("Failed to list active frontends"))]
59 ListActiveFrontends {
60 #[snafu(implicit)]
61 location: Location,
62 source: common_meta::error::Error,
63 },
64
65 #[snafu(display("Failed to list active datanodes"))]
66 ListActiveDatanodes {
67 #[snafu(implicit)]
68 location: Location,
69 source: common_meta::error::Error,
70 },
71
72 #[snafu(display("Failed to list active flownodes"))]
73 ListActiveFlownodes {
74 #[snafu(implicit)]
75 location: Location,
76 source: common_meta::error::Error,
77 },
78
79 #[snafu(display("No available frontend"))]
80 NoAvailableFrontend {
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Another migration procedure is running for region: {}", region_id))]
86 MigrationRunning {
87 #[snafu(implicit)]
88 location: Location,
89 region_id: RegionId,
90 },
91
92 #[snafu(display(
93 "The region migration procedure is completed for region: {}, target_peer: {}",
94 region_id,
95 target_peer_id
96 ))]
97 RegionMigrated {
98 #[snafu(implicit)]
99 location: Location,
100 region_id: RegionId,
101 target_peer_id: u64,
102 },
103
104 #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
105 MigrationAbort {
106 #[snafu(implicit)]
107 location: Location,
108 reason: String,
109 },
110
111 #[snafu(display(
112 "Another procedure is opening the region: {} on peer: {}",
113 region_id,
114 peer_id
115 ))]
116 RegionOpeningRace {
117 #[snafu(implicit)]
118 location: Location,
119 peer_id: DatanodeId,
120 region_id: RegionId,
121 },
122
123 #[snafu(display("Failed to init ddl manager"))]
124 InitDdlManager {
125 #[snafu(implicit)]
126 location: Location,
127 source: common_meta::error::Error,
128 },
129
130 #[snafu(display("Failed to init reconciliation manager"))]
131 InitReconciliationManager {
132 #[snafu(implicit)]
133 location: Location,
134 source: common_meta::error::Error,
135 },
136
137 #[snafu(display("Failed to create default catalog and schema"))]
138 InitMetadata {
139 #[snafu(implicit)]
140 location: Location,
141 source: common_meta::error::Error,
142 },
143
144 #[snafu(display("Failed to allocate next sequence number"))]
145 NextSequence {
146 #[snafu(implicit)]
147 location: Location,
148 source: common_meta::error::Error,
149 },
150
151 #[snafu(display("Failed to set next sequence number"))]
152 SetNextSequence {
153 #[snafu(implicit)]
154 location: Location,
155 source: common_meta::error::Error,
156 },
157
158 #[snafu(display("Failed to peek sequence number"))]
159 PeekSequence {
160 #[snafu(implicit)]
161 location: Location,
162 source: common_meta::error::Error,
163 },
164
165 #[snafu(display("Failed to start telemetry task"))]
166 StartTelemetryTask {
167 #[snafu(implicit)]
168 location: Location,
169 source: common_runtime::error::Error,
170 },
171
172 #[snafu(display("Failed to submit ddl task"))]
173 SubmitDdlTask {
174 #[snafu(implicit)]
175 location: Location,
176 source: common_meta::error::Error,
177 },
178
179 #[snafu(display("Failed to submit reconcile procedure"))]
180 SubmitReconcileProcedure {
181 #[snafu(implicit)]
182 location: Location,
183 source: common_meta::error::Error,
184 },
185
186 #[snafu(display("Failed to invalidate table cache"))]
187 InvalidateTableCache {
188 #[snafu(implicit)]
189 location: Location,
190 source: common_meta::error::Error,
191 },
192
193 #[snafu(display("Failed to list catalogs"))]
194 ListCatalogs {
195 #[snafu(implicit)]
196 location: Location,
197 source: BoxedError,
198 },
199
200 #[snafu(display("Failed to list {}'s schemas", catalog))]
201 ListSchemas {
202 #[snafu(implicit)]
203 location: Location,
204 catalog: String,
205 source: BoxedError,
206 },
207
208 #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
209 ListTables {
210 #[snafu(implicit)]
211 location: Location,
212 catalog: String,
213 schema: String,
214 source: BoxedError,
215 },
216
217 #[snafu(display("Failed to join a future"))]
218 Join {
219 #[snafu(implicit)]
220 location: Location,
221 #[snafu(source)]
222 error: JoinError,
223 },
224
225 #[snafu(display(
226 "Failed to request {}, required: {}, but only {} available",
227 select_target,
228 required,
229 available
230 ))]
231 NoEnoughAvailableNode {
232 #[snafu(implicit)]
233 location: Location,
234 required: usize,
235 available: usize,
236 select_target: SelectTarget,
237 },
238
239 #[snafu(display("Failed to send shutdown signal"))]
240 SendShutdownSignal {
241 #[snafu(source)]
242 error: SendError<()>,
243 },
244
245 #[snafu(display("Failed to shutdown {} server", server))]
246 ShutdownServer {
247 #[snafu(implicit)]
248 location: Location,
249 source: servers::error::Error,
250 server: String,
251 },
252
253 #[snafu(display("Empty key is not allowed"))]
254 EmptyKey {
255 #[snafu(implicit)]
256 location: Location,
257 },
258
259 #[snafu(display("Failed to execute via Etcd"))]
260 EtcdFailed {
261 #[snafu(source)]
262 error: etcd_client::Error,
263 #[snafu(implicit)]
264 location: Location,
265 },
266
267 #[snafu(display("Failed to connect to Etcd"))]
268 ConnectEtcd {
269 #[snafu(source)]
270 error: etcd_client::Error,
271 #[snafu(implicit)]
272 location: Location,
273 },
274
275 #[snafu(display("Failed to read file: {}", path))]
276 FileIo {
277 #[snafu(source)]
278 error: std::io::Error,
279 #[snafu(implicit)]
280 location: Location,
281 path: String,
282 },
283
284 #[snafu(display("Failed to bind address {}", addr))]
285 TcpBind {
286 addr: String,
287 #[snafu(source)]
288 error: std::io::Error,
289 #[snafu(implicit)]
290 location: Location,
291 },
292
293 #[snafu(display("Failed to start gRPC server"))]
294 StartGrpc {
295 #[snafu(source)]
296 error: tonic::transport::Error,
297 #[snafu(implicit)]
298 location: Location,
299 },
300
301 #[snafu(display("Failed to start http server"))]
302 StartHttp {
303 #[snafu(implicit)]
304 location: Location,
305 source: servers::error::Error,
306 },
307
308 #[snafu(display("Failed to parse address {}", addr))]
309 ParseAddr {
310 addr: String,
311 #[snafu(source)]
312 error: std::net::AddrParseError,
313 },
314
315 #[snafu(display("Invalid lease key: {}", key))]
316 InvalidLeaseKey {
317 key: String,
318 #[snafu(implicit)]
319 location: Location,
320 },
321
322 #[snafu(display("Invalid datanode stat key: {}", key))]
323 InvalidStatKey {
324 key: String,
325 #[snafu(implicit)]
326 location: Location,
327 },
328
329 #[snafu(display("Invalid inactive region key: {}", key))]
330 InvalidInactiveRegionKey {
331 key: String,
332 #[snafu(implicit)]
333 location: Location,
334 },
335
336 #[snafu(display("Failed to parse lease key from utf8"))]
337 LeaseKeyFromUtf8 {
338 #[snafu(source)]
339 error: std::string::FromUtf8Error,
340 #[snafu(implicit)]
341 location: Location,
342 },
343
344 #[snafu(display("Failed to parse lease value from utf8"))]
345 LeaseValueFromUtf8 {
346 #[snafu(source)]
347 error: std::string::FromUtf8Error,
348 #[snafu(implicit)]
349 location: Location,
350 },
351
352 #[snafu(display("Failed to parse invalid region key from utf8"))]
353 InvalidRegionKeyFromUtf8 {
354 #[snafu(source)]
355 error: std::string::FromUtf8Error,
356 #[snafu(implicit)]
357 location: Location,
358 },
359
360 #[snafu(display("Failed to serialize to json: {}", input))]
361 SerializeToJson {
362 input: String,
363 #[snafu(source)]
364 error: serde_json::error::Error,
365 #[snafu(implicit)]
366 location: Location,
367 },
368
369 #[snafu(display("Failed to deserialize from json: {}", input))]
370 DeserializeFromJson {
371 input: String,
372 #[snafu(source)]
373 error: serde_json::error::Error,
374 #[snafu(implicit)]
375 location: Location,
376 },
377
378 #[snafu(display("Failed to parse number: {}", err_msg))]
379 ParseNum {
380 err_msg: String,
381 #[snafu(source)]
382 error: std::num::ParseIntError,
383 #[snafu(implicit)]
384 location: Location,
385 },
386
387 #[snafu(display("Failed to parse bool: {}", err_msg))]
388 ParseBool {
389 err_msg: String,
390 #[snafu(source)]
391 error: std::str::ParseBoolError,
392 #[snafu(implicit)]
393 location: Location,
394 },
395
396 #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
397 DowngradeLeader {
398 region_id: RegionId,
399 #[snafu(implicit)]
400 location: Location,
401 #[snafu(source)]
402 source: BoxedError,
403 },
404
405 #[snafu(display("Region's leader peer changed: {}", msg))]
406 LeaderPeerChanged {
407 msg: String,
408 #[snafu(implicit)]
409 location: Location,
410 },
411
412 #[snafu(display("Invalid arguments: {}", err_msg))]
413 InvalidArguments {
414 err_msg: String,
415 #[snafu(implicit)]
416 location: Location,
417 },
418
419 #[cfg(feature = "mysql_kvbackend")]
420 #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
421 ParseMySqlUrl {
422 #[snafu(source)]
423 error: sqlx::error::Error,
424 mysql_url: String,
425 #[snafu(implicit)]
426 location: Location,
427 },
428
429 #[cfg(feature = "mysql_kvbackend")]
430 #[snafu(display("Failed to decode sql value"))]
431 DecodeSqlValue {
432 #[snafu(source)]
433 error: sqlx::error::Error,
434 #[snafu(implicit)]
435 location: Location,
436 },
437
438 #[snafu(display("Failed to find table route for {table_id}"))]
439 TableRouteNotFound {
440 table_id: TableId,
441 #[snafu(implicit)]
442 location: Location,
443 },
444
445 #[snafu(display("Failed to find table route for {region_id}"))]
446 RegionRouteNotFound {
447 region_id: RegionId,
448 #[snafu(implicit)]
449 location: Location,
450 },
451
452 #[snafu(display("Table info not found: {}", table_id))]
453 TableInfoNotFound {
454 table_id: TableId,
455 #[snafu(implicit)]
456 location: Location,
457 },
458
459 #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
460 DatanodeTableNotFound {
461 table_id: TableId,
462 datanode_id: DatanodeId,
463 #[snafu(implicit)]
464 location: Location,
465 },
466
467 #[snafu(display("Metasrv has no leader at this moment"))]
468 NoLeader {
469 #[snafu(implicit)]
470 location: Location,
471 },
472
473 #[snafu(display("Leader lease expired"))]
474 LeaderLeaseExpired {
475 #[snafu(implicit)]
476 location: Location,
477 },
478
479 #[snafu(display("Leader lease changed during election"))]
480 LeaderLeaseChanged {
481 #[snafu(implicit)]
482 location: Location,
483 },
484
485 #[snafu(display("Table {} not found", name))]
486 TableNotFound {
487 name: String,
488 #[snafu(implicit)]
489 location: Location,
490 },
491
492 #[snafu(display("Unsupported selector type, {}", selector_type))]
493 UnsupportedSelectorType {
494 selector_type: String,
495 #[snafu(implicit)]
496 location: Location,
497 },
498
499 #[snafu(display("Unexpected, violated: {violated}"))]
500 Unexpected {
501 violated: String,
502 #[snafu(implicit)]
503 location: Location,
504 },
505
506 #[snafu(display("Failed to create gRPC channel"))]
507 CreateChannel {
508 #[snafu(implicit)]
509 location: Location,
510 source: common_grpc::error::Error,
511 },
512
513 #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
514 BatchGet {
515 #[snafu(source)]
516 error: tonic::Status,
517 #[snafu(implicit)]
518 location: Location,
519 },
520
521 #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
522 Range {
523 #[snafu(source)]
524 error: tonic::Status,
525 #[snafu(implicit)]
526 location: Location,
527 },
528
529 #[snafu(display("Response header not found"))]
530 ResponseHeaderNotFound {
531 #[snafu(implicit)]
532 location: Location,
533 },
534
535 #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
536 IsNotLeader {
537 node_addr: String,
538 #[snafu(implicit)]
539 location: Location,
540 },
541
542 #[snafu(display("Invalid http body"))]
543 InvalidHttpBody {
544 #[snafu(source)]
545 error: http::Error,
546 #[snafu(implicit)]
547 location: Location,
548 },
549
550 #[snafu(display(
551 "The number of retries for the grpc call {} exceeded the limit, {}",
552 func_name,
553 retry_num
554 ))]
555 ExceededRetryLimit {
556 func_name: String,
557 retry_num: usize,
558 #[snafu(implicit)]
559 location: Location,
560 },
561
562 #[snafu(display("Invalid utf-8 value"))]
563 InvalidUtf8Value {
564 #[snafu(source)]
565 error: std::string::FromUtf8Error,
566 #[snafu(implicit)]
567 location: Location,
568 },
569
570 #[snafu(display("Missing required parameter, param: {:?}", param))]
571 MissingRequiredParameter { param: String },
572
573 #[snafu(display("Failed to start procedure manager"))]
574 StartProcedureManager {
575 #[snafu(implicit)]
576 location: Location,
577 source: common_procedure::Error,
578 },
579
580 #[snafu(display("Failed to stop procedure manager"))]
581 StopProcedureManager {
582 #[snafu(implicit)]
583 location: Location,
584 source: common_procedure::Error,
585 },
586
587 #[snafu(display("Failed to wait procedure done"))]
588 WaitProcedure {
589 #[snafu(implicit)]
590 location: Location,
591 source: common_procedure::Error,
592 },
593
594 #[snafu(display("Failed to query procedure state"))]
595 QueryProcedure {
596 #[snafu(implicit)]
597 location: Location,
598 source: common_procedure::Error,
599 },
600
601 #[snafu(display("Procedure not found: {pid}"))]
602 ProcedureNotFound {
603 #[snafu(implicit)]
604 location: Location,
605 pid: String,
606 },
607
608 #[snafu(display("Failed to submit procedure"))]
609 SubmitProcedure {
610 #[snafu(implicit)]
611 location: Location,
612 source: common_procedure::Error,
613 },
614
615 #[snafu(display("A prune task for topic {} is already running", topic))]
616 PruneTaskAlreadyRunning {
617 topic: String,
618 #[snafu(implicit)]
619 location: Location,
620 },
621
622 #[snafu(display("Schema already exists, name: {schema_name}"))]
623 SchemaAlreadyExists {
624 schema_name: String,
625 #[snafu(implicit)]
626 location: Location,
627 },
628
629 #[snafu(display("Table already exists: {table_name}"))]
630 TableAlreadyExists {
631 table_name: String,
632 #[snafu(implicit)]
633 location: Location,
634 },
635
636 #[snafu(display("Pusher not found: {pusher_id}"))]
637 PusherNotFound {
638 pusher_id: String,
639 #[snafu(implicit)]
640 location: Location,
641 },
642
643 #[snafu(display("Failed to push message: {err_msg}"))]
644 PushMessage {
645 err_msg: String,
646 #[snafu(implicit)]
647 location: Location,
648 },
649
650 #[snafu(display("Mailbox already closed: {id}"))]
651 MailboxClosed {
652 id: u64,
653 #[snafu(implicit)]
654 location: Location,
655 },
656
657 #[snafu(display("Mailbox timeout: {id}"))]
658 MailboxTimeout {
659 id: u64,
660 #[snafu(implicit)]
661 location: Location,
662 },
663
664 #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
665 MailboxReceiver {
666 id: u64,
667 err_msg: String,
668 #[snafu(implicit)]
669 location: Location,
670 },
671
672 #[snafu(display("Mailbox channel closed: {channel}"))]
673 MailboxChannelClosed {
674 channel: Channel,
675 #[snafu(implicit)]
676 location: Location,
677 },
678
679 #[snafu(display("Missing request header"))]
680 MissingRequestHeader {
681 #[snafu(implicit)]
682 location: Location,
683 },
684
685 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
686 RegisterProcedureLoader {
687 type_name: String,
688 #[snafu(implicit)]
689 location: Location,
690 source: common_procedure::error::Error,
691 },
692
693 #[snafu(display(
694 "Received unexpected instruction reply, mailbox message: {}, reason: {}",
695 mailbox_message,
696 reason
697 ))]
698 UnexpectedInstructionReply {
699 mailbox_message: String,
700 reason: String,
701 #[snafu(implicit)]
702 location: Location,
703 },
704
705 #[snafu(display("Expected to retry later, reason: {}", reason))]
706 RetryLater {
707 reason: String,
708 #[snafu(implicit)]
709 location: Location,
710 },
711
712 #[snafu(display("Expected to retry later, reason: {}", reason))]
713 RetryLaterWithSource {
714 reason: String,
715 #[snafu(implicit)]
716 location: Location,
717 source: BoxedError,
718 },
719
720 #[snafu(display("Failed to convert proto data"))]
721 ConvertProtoData {
722 #[snafu(implicit)]
723 location: Location,
724 source: common_meta::error::Error,
725 },
726
727 #[snafu(display("Other error"))]
730 Other {
731 source: BoxedError,
732 #[snafu(implicit)]
733 location: Location,
734 },
735
736 #[snafu(display("Table metadata manager error"))]
737 TableMetadataManager {
738 source: common_meta::error::Error,
739 #[snafu(implicit)]
740 location: Location,
741 },
742
743 #[snafu(display("Runtime switch manager error"))]
744 RuntimeSwitchManager {
745 source: common_meta::error::Error,
746 #[snafu(implicit)]
747 location: Location,
748 },
749
750 #[snafu(display("Keyvalue backend error"))]
751 KvBackend {
752 source: common_meta::error::Error,
753 #[snafu(implicit)]
754 location: Location,
755 },
756
757 #[snafu(display("Failed to publish message"))]
758 PublishMessage {
759 #[snafu(source)]
760 error: SendError<Message>,
761 #[snafu(implicit)]
762 location: Location,
763 },
764
765 #[snafu(display("Too many partitions"))]
766 TooManyPartitions {
767 #[snafu(implicit)]
768 location: Location,
769 },
770
771 #[snafu(display("Unsupported operation {}", operation))]
772 Unsupported {
773 operation: String,
774 #[snafu(implicit)]
775 location: Location,
776 },
777
778 #[snafu(display("Unexpected table route type: {}", err_msg))]
779 UnexpectedLogicalRouteTable {
780 #[snafu(implicit)]
781 location: Location,
782 err_msg: String,
783 source: common_meta::error::Error,
784 },
785
786 #[snafu(display("Failed to save cluster info"))]
787 SaveClusterInfo {
788 #[snafu(implicit)]
789 location: Location,
790 source: common_meta::error::Error,
791 },
792
793 #[snafu(display("Invalid cluster info format"))]
794 InvalidClusterInfoFormat {
795 #[snafu(implicit)]
796 location: Location,
797 source: common_meta::error::Error,
798 },
799
800 #[snafu(display("Invalid datanode stat format"))]
801 InvalidDatanodeStatFormat {
802 #[snafu(implicit)]
803 location: Location,
804 source: common_meta::error::Error,
805 },
806
807 #[snafu(display("Invalid node info format"))]
808 InvalidNodeInfoFormat {
809 #[snafu(implicit)]
810 location: Location,
811 source: common_meta::error::Error,
812 },
813
814 #[snafu(display("Failed to serialize options to TOML"))]
815 TomlFormat {
816 #[snafu(implicit)]
817 location: Location,
818 #[snafu(source(from(common_config::error::Error, Box::new)))]
819 source: Box<common_config::error::Error>,
820 },
821
822 #[cfg(feature = "pg_kvbackend")]
823 #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
824 PostgresExecution {
825 #[snafu(source)]
826 error: tokio_postgres::Error,
827 sql: String,
828 #[snafu(implicit)]
829 location: Location,
830 },
831
832 #[cfg(feature = "pg_kvbackend")]
833 #[snafu(display("Failed to get Postgres client"))]
834 GetPostgresClient {
835 #[snafu(implicit)]
836 location: Location,
837 #[snafu(source)]
838 error: deadpool::managed::PoolError<tokio_postgres::Error>,
839 },
840
841 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
842 #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
843 SqlExecutionTimeout {
844 #[snafu(implicit)]
845 location: Location,
846 sql: String,
847 duration: std::time::Duration,
848 },
849
850 #[cfg(feature = "pg_kvbackend")]
851 #[snafu(display("Failed to create connection pool for Postgres"))]
852 CreatePostgresPool {
853 #[snafu(source)]
854 error: deadpool_postgres::CreatePoolError,
855 #[snafu(implicit)]
856 location: Location,
857 },
858
859 #[cfg(feature = "pg_kvbackend")]
860 #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
861 GetPostgresConnection {
862 reason: String,
863 #[snafu(implicit)]
864 location: Location,
865 },
866
867 #[cfg(feature = "mysql_kvbackend")]
868 #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
869 MySqlExecution {
870 #[snafu(source)]
871 error: sqlx::Error,
872 #[snafu(implicit)]
873 location: Location,
874 sql: String,
875 },
876
877 #[cfg(feature = "mysql_kvbackend")]
878 #[snafu(display("Failed to create mysql pool"))]
879 CreateMySqlPool {
880 #[snafu(source)]
881 error: sqlx::Error,
882 #[snafu(implicit)]
883 location: Location,
884 },
885
886 #[cfg(feature = "mysql_kvbackend")]
887 #[snafu(display("Failed to acquire mysql client from pool"))]
888 AcquireMySqlClient {
889 #[snafu(source)]
890 error: sqlx::Error,
891 #[snafu(implicit)]
892 location: Location,
893 },
894
895 #[snafu(display("Handler not found: {}", name))]
896 HandlerNotFound {
897 name: String,
898 #[snafu(implicit)]
899 location: Location,
900 },
901
902 #[snafu(display("Flow state handler error"))]
903 FlowStateHandler {
904 #[snafu(implicit)]
905 location: Location,
906 source: common_meta::error::Error,
907 },
908
909 #[snafu(display("Failed to build wal options allocator"))]
910 BuildWalOptionsAllocator {
911 #[snafu(implicit)]
912 location: Location,
913 source: common_meta::error::Error,
914 },
915
916 #[snafu(display("Failed to parse wal options"))]
917 ParseWalOptions {
918 #[snafu(implicit)]
919 location: Location,
920 source: common_meta::error::Error,
921 },
922
923 #[snafu(display("Failed to build kafka client."))]
924 BuildKafkaClient {
925 #[snafu(implicit)]
926 location: Location,
927 #[snafu(source)]
928 error: common_meta::error::Error,
929 },
930
931 #[snafu(display(
932 "Failed to build a Kafka partition client, topic: {}, partition: {}",
933 topic,
934 partition
935 ))]
936 BuildPartitionClient {
937 topic: String,
938 partition: i32,
939 #[snafu(implicit)]
940 location: Location,
941 #[snafu(source)]
942 error: rskafka::client::error::Error,
943 },
944
945 #[snafu(display(
946 "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
947 topic,
948 partition,
949 offset
950 ))]
951 DeleteRecords {
952 #[snafu(implicit)]
953 location: Location,
954 #[snafu(source)]
955 error: rskafka::client::error::Error,
956 topic: String,
957 partition: i32,
958 offset: u64,
959 },
960
961 #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
962 GetOffset {
963 topic: String,
964 #[snafu(implicit)]
965 location: Location,
966 #[snafu(source)]
967 error: rskafka::client::error::Error,
968 },
969
970 #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
971 UpdateTopicNameValue {
972 topic: String,
973 #[snafu(implicit)]
974 location: Location,
975 #[snafu(source)]
976 source: common_meta::error::Error,
977 },
978
979 #[snafu(display("Failed to build tls options"))]
980 BuildTlsOptions {
981 #[snafu(implicit)]
982 location: Location,
983 #[snafu(source)]
984 source: common_meta::error::Error,
985 },
986
987 #[snafu(display(
988 "Repartition group {} source region missing, region id: {}",
989 group_id,
990 region_id
991 ))]
992 RepartitionSourceRegionMissing {
993 group_id: Uuid,
994 region_id: RegionId,
995 #[snafu(implicit)]
996 location: Location,
997 },
998
999 #[snafu(display(
1000 "Repartition group {} target region missing, region id: {}",
1001 group_id,
1002 region_id
1003 ))]
1004 RepartitionTargetRegionMissing {
1005 group_id: Uuid,
1006 region_id: RegionId,
1007 #[snafu(implicit)]
1008 location: Location,
1009 },
1010
1011 #[snafu(display("Failed to serialize partition expression: {}", source))]
1012 SerializePartitionExpr {
1013 #[snafu(source)]
1014 source: partition::error::Error,
1015 #[snafu(implicit)]
1016 location: Location,
1017 },
1018
1019 #[snafu(display(
1020 "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1021 region_id,
1022 expected,
1023 actual
1024 ))]
1025 PartitionExprMismatch {
1026 region_id: RegionId,
1027 expected: String,
1028 actual: String,
1029 #[snafu(implicit)]
1030 location: Location,
1031 },
1032}
1033
1034impl Error {
1035 pub fn is_retryable(&self) -> bool {
1037 matches!(
1038 self,
1039 Error::RetryLater { .. }
1040 | Error::RetryLaterWithSource { .. }
1041 | Error::MailboxTimeout { .. }
1042 )
1043 }
1044}
1045
1046pub type Result<T> = std::result::Result<T, Error>;
1047
1048define_into_tonic_status!(Error);
1049
1050impl ErrorExt for Error {
1051 fn status_code(&self) -> StatusCode {
1052 match self {
1053 Error::EtcdFailed { .. }
1054 | Error::ConnectEtcd { .. }
1055 | Error::FileIo { .. }
1056 | Error::TcpBind { .. }
1057 | Error::SerializeToJson { .. }
1058 | Error::DeserializeFromJson { .. }
1059 | Error::NoLeader { .. }
1060 | Error::LeaderLeaseExpired { .. }
1061 | Error::LeaderLeaseChanged { .. }
1062 | Error::CreateChannel { .. }
1063 | Error::BatchGet { .. }
1064 | Error::Range { .. }
1065 | Error::ResponseHeaderNotFound { .. }
1066 | Error::InvalidHttpBody { .. }
1067 | Error::ExceededRetryLimit { .. }
1068 | Error::SendShutdownSignal { .. }
1069 | Error::PushMessage { .. }
1070 | Error::MailboxClosed { .. }
1071 | Error::MailboxReceiver { .. }
1072 | Error::StartGrpc { .. }
1073 | Error::PublishMessage { .. }
1074 | Error::Join { .. }
1075 | Error::ChooseItems { .. }
1076 | Error::FlowStateHandler { .. }
1077 | Error::BuildWalOptionsAllocator { .. }
1078 | Error::BuildPartitionClient { .. }
1079 | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1080
1081 Error::DeleteRecords { .. }
1082 | Error::GetOffset { .. }
1083 | Error::PeerUnavailable { .. }
1084 | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1085 Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1086 Error::PruneTaskAlreadyRunning { .. }
1087 | Error::RetryLater { .. }
1088 | Error::MailboxChannelClosed { .. }
1089 | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1090 Error::RetryLaterWithSource { source, .. } => source.status_code(),
1091 Error::SerializePartitionExpr { source, .. } => source.status_code(),
1092
1093 Error::Unsupported { .. } => StatusCode::Unsupported,
1094
1095 Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1096
1097 Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1098 Error::EmptyKey { .. }
1099 | Error::MissingRequiredParameter { .. }
1100 | Error::MissingRequestHeader { .. }
1101 | Error::InvalidLeaseKey { .. }
1102 | Error::InvalidStatKey { .. }
1103 | Error::InvalidInactiveRegionKey { .. }
1104 | Error::ParseNum { .. }
1105 | Error::ParseBool { .. }
1106 | Error::ParseAddr { .. }
1107 | Error::UnsupportedSelectorType { .. }
1108 | Error::InvalidArguments { .. }
1109 | Error::ProcedureNotFound { .. }
1110 | Error::TooManyPartitions { .. }
1111 | Error::TomlFormat { .. }
1112 | Error::HandlerNotFound { .. }
1113 | Error::LeaderPeerChanged { .. }
1114 | Error::RepartitionSourceRegionMissing { .. }
1115 | Error::RepartitionTargetRegionMissing { .. }
1116 | Error::PartitionExprMismatch { .. } => StatusCode::InvalidArguments,
1117 Error::LeaseKeyFromUtf8 { .. }
1118 | Error::LeaseValueFromUtf8 { .. }
1119 | Error::InvalidRegionKeyFromUtf8 { .. }
1120 | Error::TableRouteNotFound { .. }
1121 | Error::TableInfoNotFound { .. }
1122 | Error::DatanodeTableNotFound { .. }
1123 | Error::InvalidUtf8Value { .. }
1124 | Error::UnexpectedInstructionReply { .. }
1125 | Error::Unexpected { .. }
1126 | Error::RegionOpeningRace { .. }
1127 | Error::RegionRouteNotFound { .. }
1128 | Error::MigrationAbort { .. }
1129 | Error::MigrationRunning { .. }
1130 | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1131 Error::TableNotFound { .. } => StatusCode::TableNotFound,
1132 Error::SaveClusterInfo { source, .. }
1133 | Error::InvalidClusterInfoFormat { source, .. }
1134 | Error::InvalidDatanodeStatFormat { source, .. }
1135 | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1136 Error::InvalidateTableCache { source, .. } => source.status_code(),
1137 Error::SubmitProcedure { source, .. }
1138 | Error::WaitProcedure { source, .. }
1139 | Error::QueryProcedure { source, .. } => source.status_code(),
1140 Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1141 source.status_code()
1142 }
1143 Error::StartProcedureManager { source, .. }
1144 | Error::StopProcedureManager { source, .. } => source.status_code(),
1145
1146 Error::ListCatalogs { source, .. }
1147 | Error::ListSchemas { source, .. }
1148 | Error::ListTables { source, .. } => source.status_code(),
1149 Error::StartTelemetryTask { source, .. } => source.status_code(),
1150
1151 Error::NextSequence { source, .. }
1152 | Error::SetNextSequence { source, .. }
1153 | Error::PeekSequence { source, .. } => source.status_code(),
1154 Error::DowngradeLeader { source, .. } => source.status_code(),
1155 Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1156 Error::SubmitDdlTask { source, .. }
1157 | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1158 Error::ConvertProtoData { source, .. }
1159 | Error::TableMetadataManager { source, .. }
1160 | Error::RuntimeSwitchManager { source, .. }
1161 | Error::KvBackend { source, .. }
1162 | Error::UnexpectedLogicalRouteTable { source, .. }
1163 | Error::UpdateTopicNameValue { source, .. }
1164 | Error::ParseWalOptions { source, .. } => source.status_code(),
1165 Error::ListActiveFrontends { source, .. }
1166 | Error::ListActiveDatanodes { source, .. }
1167 | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1168 Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1169
1170 Error::InitMetadata { source, .. }
1171 | Error::InitDdlManager { source, .. }
1172 | Error::InitReconciliationManager { source, .. } => source.status_code(),
1173
1174 Error::BuildTlsOptions { source, .. } => source.status_code(),
1175 Error::Other { source, .. } => source.status_code(),
1176 Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1177
1178 #[cfg(feature = "pg_kvbackend")]
1179 Error::CreatePostgresPool { .. }
1180 | Error::GetPostgresClient { .. }
1181 | Error::GetPostgresConnection { .. }
1182 | Error::PostgresExecution { .. } => StatusCode::Internal,
1183 #[cfg(feature = "mysql_kvbackend")]
1184 Error::MySqlExecution { .. }
1185 | Error::CreateMySqlPool { .. }
1186 | Error::ParseMySqlUrl { .. }
1187 | Error::DecodeSqlValue { .. }
1188 | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1189 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1190 Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1191 }
1192 }
1193
1194 fn as_any(&self) -> &dyn std::any::Any {
1195 self
1196 }
1197}
1198
1199pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1201 let mut err: &(dyn std::error::Error + 'static) = err_status;
1202
1203 loop {
1204 if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1205 return Some(io_err);
1206 }
1207
1208 if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1211 && let Some(io_err) = h2_err.get_io()
1212 {
1213 return Some(io_err);
1214 }
1215
1216 err = err.source()?;
1217 }
1218}