meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Failed to choose items"))]
36    ChooseItems {
37        #[snafu(implicit)]
38        location: Location,
39        #[snafu(source)]
40        error: rand::distr::weighted::Error,
41    },
42
43    #[snafu(display("Exceeded deadline, operation: {}", operation))]
44    ExceededDeadline {
45        #[snafu(implicit)]
46        location: Location,
47        operation: String,
48    },
49
50    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51    PeerUnavailable {
52        #[snafu(implicit)]
53        location: Location,
54        peer_id: u64,
55    },
56
57    #[snafu(display("Failed to list active frontends"))]
58    ListActiveFrontends {
59        #[snafu(implicit)]
60        location: Location,
61        source: common_meta::error::Error,
62    },
63
64    #[snafu(display("Failed to list active datanodes"))]
65    ListActiveDatanodes {
66        #[snafu(implicit)]
67        location: Location,
68        source: common_meta::error::Error,
69    },
70
71    #[snafu(display("Failed to list active flownodes"))]
72    ListActiveFlownodes {
73        #[snafu(implicit)]
74        location: Location,
75        source: common_meta::error::Error,
76    },
77
78    #[snafu(display("No available frontend"))]
79    NoAvailableFrontend {
80        #[snafu(implicit)]
81        location: Location,
82    },
83
84    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
85    MigrationRunning {
86        #[snafu(implicit)]
87        location: Location,
88        region_id: RegionId,
89    },
90
91    #[snafu(display(
92        "The region migration procedure is completed for region: {}, target_peer: {}",
93        region_id,
94        target_peer_id
95    ))]
96    RegionMigrated {
97        #[snafu(implicit)]
98        location: Location,
99        region_id: RegionId,
100        target_peer_id: u64,
101    },
102
103    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
104    MigrationAbort {
105        #[snafu(implicit)]
106        location: Location,
107        reason: String,
108    },
109
110    #[snafu(display(
111        "Another procedure is opening the region: {} on peer: {}",
112        region_id,
113        peer_id
114    ))]
115    RegionOpeningRace {
116        #[snafu(implicit)]
117        location: Location,
118        peer_id: DatanodeId,
119        region_id: RegionId,
120    },
121
122    #[snafu(display("Failed to init ddl manager"))]
123    InitDdlManager {
124        #[snafu(implicit)]
125        location: Location,
126        source: common_meta::error::Error,
127    },
128
129    #[snafu(display("Failed to init reconciliation manager"))]
130    InitReconciliationManager {
131        #[snafu(implicit)]
132        location: Location,
133        source: common_meta::error::Error,
134    },
135
136    #[snafu(display("Failed to create default catalog and schema"))]
137    InitMetadata {
138        #[snafu(implicit)]
139        location: Location,
140        source: common_meta::error::Error,
141    },
142
143    #[snafu(display("Failed to allocate next sequence number"))]
144    NextSequence {
145        #[snafu(implicit)]
146        location: Location,
147        source: common_meta::error::Error,
148    },
149
150    #[snafu(display("Failed to set next sequence number"))]
151    SetNextSequence {
152        #[snafu(implicit)]
153        location: Location,
154        source: common_meta::error::Error,
155    },
156
157    #[snafu(display("Failed to peek sequence number"))]
158    PeekSequence {
159        #[snafu(implicit)]
160        location: Location,
161        source: common_meta::error::Error,
162    },
163
164    #[snafu(display("Failed to start telemetry task"))]
165    StartTelemetryTask {
166        #[snafu(implicit)]
167        location: Location,
168        source: common_runtime::error::Error,
169    },
170
171    #[snafu(display("Failed to submit ddl task"))]
172    SubmitDdlTask {
173        #[snafu(implicit)]
174        location: Location,
175        source: common_meta::error::Error,
176    },
177
178    #[snafu(display("Failed to submit reconcile procedure"))]
179    SubmitReconcileProcedure {
180        #[snafu(implicit)]
181        location: Location,
182        source: common_meta::error::Error,
183    },
184
185    #[snafu(display("Failed to invalidate table cache"))]
186    InvalidateTableCache {
187        #[snafu(implicit)]
188        location: Location,
189        source: common_meta::error::Error,
190    },
191
192    #[snafu(display("Failed to list catalogs"))]
193    ListCatalogs {
194        #[snafu(implicit)]
195        location: Location,
196        source: BoxedError,
197    },
198
199    #[snafu(display("Failed to list {}'s schemas", catalog))]
200    ListSchemas {
201        #[snafu(implicit)]
202        location: Location,
203        catalog: String,
204        source: BoxedError,
205    },
206
207    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
208    ListTables {
209        #[snafu(implicit)]
210        location: Location,
211        catalog: String,
212        schema: String,
213        source: BoxedError,
214    },
215
216    #[snafu(display("Failed to join a future"))]
217    Join {
218        #[snafu(implicit)]
219        location: Location,
220        #[snafu(source)]
221        error: JoinError,
222    },
223
224    #[snafu(display(
225        "Failed to request {}, required: {}, but only {} available",
226        select_target,
227        required,
228        available
229    ))]
230    NoEnoughAvailableNode {
231        #[snafu(implicit)]
232        location: Location,
233        required: usize,
234        available: usize,
235        select_target: SelectTarget,
236    },
237
238    #[snafu(display("Failed to send shutdown signal"))]
239    SendShutdownSignal {
240        #[snafu(source)]
241        error: SendError<()>,
242    },
243
244    #[snafu(display("Failed to shutdown {} server", server))]
245    ShutdownServer {
246        #[snafu(implicit)]
247        location: Location,
248        source: servers::error::Error,
249        server: String,
250    },
251
252    #[snafu(display("Empty key is not allowed"))]
253    EmptyKey {
254        #[snafu(implicit)]
255        location: Location,
256    },
257
258    #[snafu(display("Failed to execute via Etcd"))]
259    EtcdFailed {
260        #[snafu(source)]
261        error: etcd_client::Error,
262        #[snafu(implicit)]
263        location: Location,
264    },
265
266    #[snafu(display("Failed to connect to Etcd"))]
267    ConnectEtcd {
268        #[snafu(source)]
269        error: etcd_client::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to read file: {}", path))]
275    FileIo {
276        #[snafu(source)]
277        error: std::io::Error,
278        #[snafu(implicit)]
279        location: Location,
280        path: String,
281    },
282
283    #[snafu(display("Failed to bind address {}", addr))]
284    TcpBind {
285        addr: String,
286        #[snafu(source)]
287        error: std::io::Error,
288        #[snafu(implicit)]
289        location: Location,
290    },
291
292    #[snafu(display("Failed to start gRPC server"))]
293    StartGrpc {
294        #[snafu(source)]
295        error: tonic::transport::Error,
296        #[snafu(implicit)]
297        location: Location,
298    },
299
300    #[snafu(display("Failed to start http server"))]
301    StartHttp {
302        #[snafu(implicit)]
303        location: Location,
304        source: servers::error::Error,
305    },
306
307    #[snafu(display("Failed to parse address {}", addr))]
308    ParseAddr {
309        addr: String,
310        #[snafu(source)]
311        error: std::net::AddrParseError,
312    },
313
314    #[snafu(display("Invalid lease key: {}", key))]
315    InvalidLeaseKey {
316        key: String,
317        #[snafu(implicit)]
318        location: Location,
319    },
320
321    #[snafu(display("Invalid datanode stat key: {}", key))]
322    InvalidStatKey {
323        key: String,
324        #[snafu(implicit)]
325        location: Location,
326    },
327
328    #[snafu(display("Invalid inactive region key: {}", key))]
329    InvalidInactiveRegionKey {
330        key: String,
331        #[snafu(implicit)]
332        location: Location,
333    },
334
335    #[snafu(display("Failed to parse lease key from utf8"))]
336    LeaseKeyFromUtf8 {
337        #[snafu(source)]
338        error: std::string::FromUtf8Error,
339        #[snafu(implicit)]
340        location: Location,
341    },
342
343    #[snafu(display("Failed to parse lease value from utf8"))]
344    LeaseValueFromUtf8 {
345        #[snafu(source)]
346        error: std::string::FromUtf8Error,
347        #[snafu(implicit)]
348        location: Location,
349    },
350
351    #[snafu(display("Failed to parse invalid region key from utf8"))]
352    InvalidRegionKeyFromUtf8 {
353        #[snafu(source)]
354        error: std::string::FromUtf8Error,
355        #[snafu(implicit)]
356        location: Location,
357    },
358
359    #[snafu(display("Failed to serialize to json: {}", input))]
360    SerializeToJson {
361        input: String,
362        #[snafu(source)]
363        error: serde_json::error::Error,
364        #[snafu(implicit)]
365        location: Location,
366    },
367
368    #[snafu(display("Failed to deserialize from json: {}", input))]
369    DeserializeFromJson {
370        input: String,
371        #[snafu(source)]
372        error: serde_json::error::Error,
373        #[snafu(implicit)]
374        location: Location,
375    },
376
377    #[snafu(display("Failed to parse number: {}", err_msg))]
378    ParseNum {
379        err_msg: String,
380        #[snafu(source)]
381        error: std::num::ParseIntError,
382        #[snafu(implicit)]
383        location: Location,
384    },
385
386    #[snafu(display("Failed to parse bool: {}", err_msg))]
387    ParseBool {
388        err_msg: String,
389        #[snafu(source)]
390        error: std::str::ParseBoolError,
391        #[snafu(implicit)]
392        location: Location,
393    },
394
395    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
396    DowngradeLeader {
397        region_id: RegionId,
398        #[snafu(implicit)]
399        location: Location,
400        #[snafu(source)]
401        source: BoxedError,
402    },
403
404    #[snafu(display("Region's leader peer changed: {}", msg))]
405    LeaderPeerChanged {
406        msg: String,
407        #[snafu(implicit)]
408        location: Location,
409    },
410
411    #[snafu(display("Invalid arguments: {}", err_msg))]
412    InvalidArguments {
413        err_msg: String,
414        #[snafu(implicit)]
415        location: Location,
416    },
417
418    #[cfg(feature = "mysql_kvbackend")]
419    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
420    ParseMySqlUrl {
421        #[snafu(source)]
422        error: sqlx::error::Error,
423        mysql_url: String,
424        #[snafu(implicit)]
425        location: Location,
426    },
427
428    #[cfg(feature = "mysql_kvbackend")]
429    #[snafu(display("Failed to decode sql value"))]
430    DecodeSqlValue {
431        #[snafu(source)]
432        error: sqlx::error::Error,
433        #[snafu(implicit)]
434        location: Location,
435    },
436
437    #[snafu(display("Failed to find table route for {table_id}"))]
438    TableRouteNotFound {
439        table_id: TableId,
440        #[snafu(implicit)]
441        location: Location,
442    },
443
444    #[snafu(display("Failed to find table route for {region_id}"))]
445    RegionRouteNotFound {
446        region_id: RegionId,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("Table info not found: {}", table_id))]
452    TableInfoNotFound {
453        table_id: TableId,
454        #[snafu(implicit)]
455        location: Location,
456    },
457
458    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
459    DatanodeTableNotFound {
460        table_id: TableId,
461        datanode_id: DatanodeId,
462        #[snafu(implicit)]
463        location: Location,
464    },
465
466    #[snafu(display("Metasrv has no leader at this moment"))]
467    NoLeader {
468        #[snafu(implicit)]
469        location: Location,
470    },
471
472    #[snafu(display("Leader lease expired"))]
473    LeaderLeaseExpired {
474        #[snafu(implicit)]
475        location: Location,
476    },
477
478    #[snafu(display("Leader lease changed during election"))]
479    LeaderLeaseChanged {
480        #[snafu(implicit)]
481        location: Location,
482    },
483
484    #[snafu(display("Table {} not found", name))]
485    TableNotFound {
486        name: String,
487        #[snafu(implicit)]
488        location: Location,
489    },
490
491    #[snafu(display("Unsupported selector type, {}", selector_type))]
492    UnsupportedSelectorType {
493        selector_type: String,
494        #[snafu(implicit)]
495        location: Location,
496    },
497
498    #[snafu(display("Unexpected, violated: {violated}"))]
499    Unexpected {
500        violated: String,
501        #[snafu(implicit)]
502        location: Location,
503    },
504
505    #[snafu(display("Failed to create gRPC channel"))]
506    CreateChannel {
507        #[snafu(implicit)]
508        location: Location,
509        source: common_grpc::error::Error,
510    },
511
512    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
513    BatchGet {
514        #[snafu(source)]
515        error: tonic::Status,
516        #[snafu(implicit)]
517        location: Location,
518    },
519
520    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
521    Range {
522        #[snafu(source)]
523        error: tonic::Status,
524        #[snafu(implicit)]
525        location: Location,
526    },
527
528    #[snafu(display("Response header not found"))]
529    ResponseHeaderNotFound {
530        #[snafu(implicit)]
531        location: Location,
532    },
533
534    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
535    IsNotLeader {
536        node_addr: String,
537        #[snafu(implicit)]
538        location: Location,
539    },
540
541    #[snafu(display("Invalid http body"))]
542    InvalidHttpBody {
543        #[snafu(source)]
544        error: http::Error,
545        #[snafu(implicit)]
546        location: Location,
547    },
548
549    #[snafu(display(
550        "The number of retries for the grpc call {} exceeded the limit, {}",
551        func_name,
552        retry_num
553    ))]
554    ExceededRetryLimit {
555        func_name: String,
556        retry_num: usize,
557        #[snafu(implicit)]
558        location: Location,
559    },
560
561    #[snafu(display("Invalid utf-8 value"))]
562    InvalidUtf8Value {
563        #[snafu(source)]
564        error: std::string::FromUtf8Error,
565        #[snafu(implicit)]
566        location: Location,
567    },
568
569    #[snafu(display("Missing required parameter, param: {:?}", param))]
570    MissingRequiredParameter { param: String },
571
572    #[snafu(display("Failed to start procedure manager"))]
573    StartProcedureManager {
574        #[snafu(implicit)]
575        location: Location,
576        source: common_procedure::Error,
577    },
578
579    #[snafu(display("Failed to stop procedure manager"))]
580    StopProcedureManager {
581        #[snafu(implicit)]
582        location: Location,
583        source: common_procedure::Error,
584    },
585
586    #[snafu(display("Failed to wait procedure done"))]
587    WaitProcedure {
588        #[snafu(implicit)]
589        location: Location,
590        source: common_procedure::Error,
591    },
592
593    #[snafu(display("Failed to query procedure state"))]
594    QueryProcedure {
595        #[snafu(implicit)]
596        location: Location,
597        source: common_procedure::Error,
598    },
599
600    #[snafu(display("Procedure not found: {pid}"))]
601    ProcedureNotFound {
602        #[snafu(implicit)]
603        location: Location,
604        pid: String,
605    },
606
607    #[snafu(display("Failed to submit procedure"))]
608    SubmitProcedure {
609        #[snafu(implicit)]
610        location: Location,
611        source: common_procedure::Error,
612    },
613
614    #[snafu(display("A prune task for topic {} is already running", topic))]
615    PruneTaskAlreadyRunning {
616        topic: String,
617        #[snafu(implicit)]
618        location: Location,
619    },
620
621    #[snafu(display("Schema already exists, name: {schema_name}"))]
622    SchemaAlreadyExists {
623        schema_name: String,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Table already exists: {table_name}"))]
629    TableAlreadyExists {
630        table_name: String,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Pusher not found: {pusher_id}"))]
636    PusherNotFound {
637        pusher_id: String,
638        #[snafu(implicit)]
639        location: Location,
640    },
641
642    #[snafu(display("Failed to push message: {err_msg}"))]
643    PushMessage {
644        err_msg: String,
645        #[snafu(implicit)]
646        location: Location,
647    },
648
649    #[snafu(display("Mailbox already closed: {id}"))]
650    MailboxClosed {
651        id: u64,
652        #[snafu(implicit)]
653        location: Location,
654    },
655
656    #[snafu(display("Mailbox timeout: {id}"))]
657    MailboxTimeout {
658        id: u64,
659        #[snafu(implicit)]
660        location: Location,
661    },
662
663    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
664    MailboxReceiver {
665        id: u64,
666        err_msg: String,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Mailbox channel closed: {channel}"))]
672    MailboxChannelClosed {
673        channel: Channel,
674        #[snafu(implicit)]
675        location: Location,
676    },
677
678    #[snafu(display("Missing request header"))]
679    MissingRequestHeader {
680        #[snafu(implicit)]
681        location: Location,
682    },
683
684    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
685    RegisterProcedureLoader {
686        type_name: String,
687        #[snafu(implicit)]
688        location: Location,
689        source: common_procedure::error::Error,
690    },
691
692    #[snafu(display(
693        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
694        mailbox_message,
695        reason
696    ))]
697    UnexpectedInstructionReply {
698        mailbox_message: String,
699        reason: String,
700        #[snafu(implicit)]
701        location: Location,
702    },
703
704    #[snafu(display("Expected to retry later, reason: {}", reason))]
705    RetryLater {
706        reason: String,
707        #[snafu(implicit)]
708        location: Location,
709    },
710
711    #[snafu(display("Expected to retry later, reason: {}", reason))]
712    RetryLaterWithSource {
713        reason: String,
714        #[snafu(implicit)]
715        location: Location,
716        source: BoxedError,
717    },
718
719    #[snafu(display("Failed to convert proto data"))]
720    ConvertProtoData {
721        #[snafu(implicit)]
722        location: Location,
723        source: common_meta::error::Error,
724    },
725
726    // this error is used for custom error mapping
727    // please do not delete it
728    #[snafu(display("Other error"))]
729    Other {
730        source: BoxedError,
731        #[snafu(implicit)]
732        location: Location,
733    },
734
735    #[snafu(display("Table metadata manager error"))]
736    TableMetadataManager {
737        source: common_meta::error::Error,
738        #[snafu(implicit)]
739        location: Location,
740    },
741
742    #[snafu(display("Runtime switch manager error"))]
743    RuntimeSwitchManager {
744        source: common_meta::error::Error,
745        #[snafu(implicit)]
746        location: Location,
747    },
748
749    #[snafu(display("Keyvalue backend error"))]
750    KvBackend {
751        source: common_meta::error::Error,
752        #[snafu(implicit)]
753        location: Location,
754    },
755
756    #[snafu(display("Failed to publish message"))]
757    PublishMessage {
758        #[snafu(source)]
759        error: SendError<Message>,
760        #[snafu(implicit)]
761        location: Location,
762    },
763
764    #[snafu(display("Too many partitions"))]
765    TooManyPartitions {
766        #[snafu(implicit)]
767        location: Location,
768    },
769
770    #[snafu(display("Unsupported operation {}", operation))]
771    Unsupported {
772        operation: String,
773        #[snafu(implicit)]
774        location: Location,
775    },
776
777    #[snafu(display("Unexpected table route type: {}", err_msg))]
778    UnexpectedLogicalRouteTable {
779        #[snafu(implicit)]
780        location: Location,
781        err_msg: String,
782        source: common_meta::error::Error,
783    },
784
785    #[snafu(display("Failed to save cluster info"))]
786    SaveClusterInfo {
787        #[snafu(implicit)]
788        location: Location,
789        source: common_meta::error::Error,
790    },
791
792    #[snafu(display("Invalid cluster info format"))]
793    InvalidClusterInfoFormat {
794        #[snafu(implicit)]
795        location: Location,
796        source: common_meta::error::Error,
797    },
798
799    #[snafu(display("Invalid datanode stat format"))]
800    InvalidDatanodeStatFormat {
801        #[snafu(implicit)]
802        location: Location,
803        source: common_meta::error::Error,
804    },
805
806    #[snafu(display("Invalid node info format"))]
807    InvalidNodeInfoFormat {
808        #[snafu(implicit)]
809        location: Location,
810        source: common_meta::error::Error,
811    },
812
813    #[snafu(display("Failed to serialize options to TOML"))]
814    TomlFormat {
815        #[snafu(implicit)]
816        location: Location,
817        #[snafu(source(from(common_config::error::Error, Box::new)))]
818        source: Box<common_config::error::Error>,
819    },
820
821    #[cfg(feature = "pg_kvbackend")]
822    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
823    PostgresExecution {
824        #[snafu(source)]
825        error: tokio_postgres::Error,
826        sql: String,
827        #[snafu(implicit)]
828        location: Location,
829    },
830
831    #[cfg(feature = "pg_kvbackend")]
832    #[snafu(display("Failed to get Postgres client"))]
833    GetPostgresClient {
834        #[snafu(implicit)]
835        location: Location,
836        #[snafu(source)]
837        error: deadpool::managed::PoolError<tokio_postgres::Error>,
838    },
839
840    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
841    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
842    SqlExecutionTimeout {
843        #[snafu(implicit)]
844        location: Location,
845        sql: String,
846        duration: std::time::Duration,
847    },
848
849    #[cfg(feature = "pg_kvbackend")]
850    #[snafu(display("Failed to create connection pool for Postgres"))]
851    CreatePostgresPool {
852        #[snafu(source)]
853        error: deadpool_postgres::CreatePoolError,
854        #[snafu(implicit)]
855        location: Location,
856    },
857
858    #[cfg(feature = "pg_kvbackend")]
859    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
860    GetPostgresConnection {
861        reason: String,
862        #[snafu(implicit)]
863        location: Location,
864    },
865
866    #[cfg(feature = "mysql_kvbackend")]
867    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
868    MySqlExecution {
869        #[snafu(source)]
870        error: sqlx::Error,
871        #[snafu(implicit)]
872        location: Location,
873        sql: String,
874    },
875
876    #[cfg(feature = "mysql_kvbackend")]
877    #[snafu(display("Failed to create mysql pool"))]
878    CreateMySqlPool {
879        #[snafu(source)]
880        error: sqlx::Error,
881        #[snafu(implicit)]
882        location: Location,
883    },
884
885    #[cfg(feature = "mysql_kvbackend")]
886    #[snafu(display("Failed to acquire mysql client from pool"))]
887    AcquireMySqlClient {
888        #[snafu(source)]
889        error: sqlx::Error,
890        #[snafu(implicit)]
891        location: Location,
892    },
893
894    #[snafu(display("Handler not found: {}", name))]
895    HandlerNotFound {
896        name: String,
897        #[snafu(implicit)]
898        location: Location,
899    },
900
901    #[snafu(display("Flow state handler error"))]
902    FlowStateHandler {
903        #[snafu(implicit)]
904        location: Location,
905        source: common_meta::error::Error,
906    },
907
908    #[snafu(display("Failed to build wal options allocator"))]
909    BuildWalOptionsAllocator {
910        #[snafu(implicit)]
911        location: Location,
912        source: common_meta::error::Error,
913    },
914
915    #[snafu(display("Failed to parse wal options"))]
916    ParseWalOptions {
917        #[snafu(implicit)]
918        location: Location,
919        source: common_meta::error::Error,
920    },
921
922    #[snafu(display("Failed to build kafka client."))]
923    BuildKafkaClient {
924        #[snafu(implicit)]
925        location: Location,
926        #[snafu(source)]
927        error: common_meta::error::Error,
928    },
929
930    #[snafu(display(
931        "Failed to build a Kafka partition client, topic: {}, partition: {}",
932        topic,
933        partition
934    ))]
935    BuildPartitionClient {
936        topic: String,
937        partition: i32,
938        #[snafu(implicit)]
939        location: Location,
940        #[snafu(source)]
941        error: rskafka::client::error::Error,
942    },
943
944    #[snafu(display(
945        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
946        topic,
947        partition,
948        offset
949    ))]
950    DeleteRecords {
951        #[snafu(implicit)]
952        location: Location,
953        #[snafu(source)]
954        error: rskafka::client::error::Error,
955        topic: String,
956        partition: i32,
957        offset: u64,
958    },
959
960    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
961    GetOffset {
962        topic: String,
963        #[snafu(implicit)]
964        location: Location,
965        #[snafu(source)]
966        error: rskafka::client::error::Error,
967    },
968
969    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
970    UpdateTopicNameValue {
971        topic: String,
972        #[snafu(implicit)]
973        location: Location,
974        #[snafu(source)]
975        source: common_meta::error::Error,
976    },
977
978    #[snafu(display("Failed to build tls options"))]
979    BuildTlsOptions {
980        #[snafu(implicit)]
981        location: Location,
982        #[snafu(source)]
983        source: common_meta::error::Error,
984    },
985}
986
987impl Error {
988    /// Returns `true` if the error is retryable.
989    pub fn is_retryable(&self) -> bool {
990        matches!(
991            self,
992            Error::RetryLater { .. }
993                | Error::RetryLaterWithSource { .. }
994                | Error::MailboxTimeout { .. }
995        )
996    }
997}
998
999pub type Result<T> = std::result::Result<T, Error>;
1000
1001define_into_tonic_status!(Error);
1002
1003impl ErrorExt for Error {
1004    fn status_code(&self) -> StatusCode {
1005        match self {
1006            Error::EtcdFailed { .. }
1007            | Error::ConnectEtcd { .. }
1008            | Error::FileIo { .. }
1009            | Error::TcpBind { .. }
1010            | Error::SerializeToJson { .. }
1011            | Error::DeserializeFromJson { .. }
1012            | Error::NoLeader { .. }
1013            | Error::LeaderLeaseExpired { .. }
1014            | Error::LeaderLeaseChanged { .. }
1015            | Error::CreateChannel { .. }
1016            | Error::BatchGet { .. }
1017            | Error::Range { .. }
1018            | Error::ResponseHeaderNotFound { .. }
1019            | Error::InvalidHttpBody { .. }
1020            | Error::ExceededRetryLimit { .. }
1021            | Error::SendShutdownSignal { .. }
1022            | Error::PushMessage { .. }
1023            | Error::MailboxClosed { .. }
1024            | Error::MailboxReceiver { .. }
1025            | Error::StartGrpc { .. }
1026            | Error::PublishMessage { .. }
1027            | Error::Join { .. }
1028            | Error::ChooseItems { .. }
1029            | Error::FlowStateHandler { .. }
1030            | Error::BuildWalOptionsAllocator { .. }
1031            | Error::BuildPartitionClient { .. }
1032            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1033
1034            Error::DeleteRecords { .. }
1035            | Error::GetOffset { .. }
1036            | Error::PeerUnavailable { .. }
1037            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1038            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1039            Error::PruneTaskAlreadyRunning { .. }
1040            | Error::RetryLater { .. }
1041            | Error::MailboxChannelClosed { .. }
1042            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1043            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1044
1045            Error::Unsupported { .. } => StatusCode::Unsupported,
1046
1047            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1048
1049            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1050            Error::EmptyKey { .. }
1051            | Error::MissingRequiredParameter { .. }
1052            | Error::MissingRequestHeader { .. }
1053            | Error::InvalidLeaseKey { .. }
1054            | Error::InvalidStatKey { .. }
1055            | Error::InvalidInactiveRegionKey { .. }
1056            | Error::ParseNum { .. }
1057            | Error::ParseBool { .. }
1058            | Error::ParseAddr { .. }
1059            | Error::UnsupportedSelectorType { .. }
1060            | Error::InvalidArguments { .. }
1061            | Error::ProcedureNotFound { .. }
1062            | Error::TooManyPartitions { .. }
1063            | Error::TomlFormat { .. }
1064            | Error::HandlerNotFound { .. }
1065            | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1066            Error::LeaseKeyFromUtf8 { .. }
1067            | Error::LeaseValueFromUtf8 { .. }
1068            | Error::InvalidRegionKeyFromUtf8 { .. }
1069            | Error::TableRouteNotFound { .. }
1070            | Error::TableInfoNotFound { .. }
1071            | Error::DatanodeTableNotFound { .. }
1072            | Error::InvalidUtf8Value { .. }
1073            | Error::UnexpectedInstructionReply { .. }
1074            | Error::Unexpected { .. }
1075            | Error::RegionOpeningRace { .. }
1076            | Error::RegionRouteNotFound { .. }
1077            | Error::MigrationAbort { .. }
1078            | Error::MigrationRunning { .. }
1079            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1080            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1081            Error::SaveClusterInfo { source, .. }
1082            | Error::InvalidClusterInfoFormat { source, .. }
1083            | Error::InvalidDatanodeStatFormat { source, .. }
1084            | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1085            Error::InvalidateTableCache { source, .. } => source.status_code(),
1086            Error::SubmitProcedure { source, .. }
1087            | Error::WaitProcedure { source, .. }
1088            | Error::QueryProcedure { source, .. } => source.status_code(),
1089            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1090                source.status_code()
1091            }
1092            Error::StartProcedureManager { source, .. }
1093            | Error::StopProcedureManager { source, .. } => source.status_code(),
1094
1095            Error::ListCatalogs { source, .. }
1096            | Error::ListSchemas { source, .. }
1097            | Error::ListTables { source, .. } => source.status_code(),
1098            Error::StartTelemetryTask { source, .. } => source.status_code(),
1099
1100            Error::NextSequence { source, .. }
1101            | Error::SetNextSequence { source, .. }
1102            | Error::PeekSequence { source, .. } => source.status_code(),
1103            Error::DowngradeLeader { source, .. } => source.status_code(),
1104            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1105            Error::SubmitDdlTask { source, .. }
1106            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1107            Error::ConvertProtoData { source, .. }
1108            | Error::TableMetadataManager { source, .. }
1109            | Error::RuntimeSwitchManager { source, .. }
1110            | Error::KvBackend { source, .. }
1111            | Error::UnexpectedLogicalRouteTable { source, .. }
1112            | Error::UpdateTopicNameValue { source, .. }
1113            | Error::ParseWalOptions { source, .. } => source.status_code(),
1114            Error::ListActiveFrontends { source, .. }
1115            | Error::ListActiveDatanodes { source, .. }
1116            | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1117            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1118
1119            Error::InitMetadata { source, .. }
1120            | Error::InitDdlManager { source, .. }
1121            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1122
1123            Error::BuildTlsOptions { source, .. } => source.status_code(),
1124            Error::Other { source, .. } => source.status_code(),
1125            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1126
1127            #[cfg(feature = "pg_kvbackend")]
1128            Error::CreatePostgresPool { .. }
1129            | Error::GetPostgresClient { .. }
1130            | Error::GetPostgresConnection { .. }
1131            | Error::PostgresExecution { .. } => StatusCode::Internal,
1132            #[cfg(feature = "mysql_kvbackend")]
1133            Error::MySqlExecution { .. }
1134            | Error::CreateMySqlPool { .. }
1135            | Error::ParseMySqlUrl { .. }
1136            | Error::DecodeSqlValue { .. }
1137            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1138            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1139            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1140        }
1141    }
1142
1143    fn as_any(&self) -> &dyn std::any::Any {
1144        self
1145    }
1146}
1147
1148// for form tonic
1149pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1150    let mut err: &(dyn std::error::Error + 'static) = err_status;
1151
1152    loop {
1153        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1154            return Some(io_err);
1155        }
1156
1157        // h2::Error do not expose std::io::Error with `source()`
1158        // https://github.com/hyperium/h2/pull/462
1159        if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1160            && let Some(io_err) = h2_err.get_io()
1161        {
1162            return Some(io_err);
1163        }
1164
1165        err = err.source()?;
1166    }
1167}