meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Failed to choose items"))]
36    ChooseItems {
37        #[snafu(implicit)]
38        location: Location,
39        #[snafu(source)]
40        error: rand::distr::weighted::Error,
41    },
42
43    #[snafu(display("Exceeded deadline, operation: {}", operation))]
44    ExceededDeadline {
45        #[snafu(implicit)]
46        location: Location,
47        operation: String,
48    },
49
50    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51    PeerUnavailable {
52        #[snafu(implicit)]
53        location: Location,
54        peer_id: u64,
55    },
56
57    #[snafu(display("Failed to list active frontends"))]
58    ListActiveFrontends {
59        #[snafu(implicit)]
60        location: Location,
61        source: common_meta::error::Error,
62    },
63
64    #[snafu(display("Failed to list active datanodes"))]
65    ListActiveDatanodes {
66        #[snafu(implicit)]
67        location: Location,
68        source: common_meta::error::Error,
69    },
70
71    #[snafu(display("Failed to list active flownodes"))]
72    ListActiveFlownodes {
73        #[snafu(implicit)]
74        location: Location,
75        source: common_meta::error::Error,
76    },
77
78    #[snafu(display("No available frontend"))]
79    NoAvailableFrontend {
80        #[snafu(implicit)]
81        location: Location,
82    },
83
84    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
85    MigrationRunning {
86        #[snafu(implicit)]
87        location: Location,
88        region_id: RegionId,
89    },
90
91    #[snafu(display(
92        "The region migration procedure is completed for region: {}, target_peer: {}",
93        region_id,
94        target_peer_id
95    ))]
96    RegionMigrated {
97        #[snafu(implicit)]
98        location: Location,
99        region_id: RegionId,
100        target_peer_id: u64,
101    },
102
103    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
104    MigrationAbort {
105        #[snafu(implicit)]
106        location: Location,
107        reason: String,
108    },
109
110    #[snafu(display(
111        "Another procedure is opening the region: {} on peer: {}",
112        region_id,
113        peer_id
114    ))]
115    RegionOpeningRace {
116        #[snafu(implicit)]
117        location: Location,
118        peer_id: DatanodeId,
119        region_id: RegionId,
120    },
121
122    #[snafu(display("Failed to init ddl manager"))]
123    InitDdlManager {
124        #[snafu(implicit)]
125        location: Location,
126        source: common_meta::error::Error,
127    },
128
129    #[snafu(display("Failed to init reconciliation manager"))]
130    InitReconciliationManager {
131        #[snafu(implicit)]
132        location: Location,
133        source: common_meta::error::Error,
134    },
135
136    #[snafu(display("Failed to create default catalog and schema"))]
137    InitMetadata {
138        #[snafu(implicit)]
139        location: Location,
140        source: common_meta::error::Error,
141    },
142
143    #[snafu(display("Failed to allocate next sequence number"))]
144    NextSequence {
145        #[snafu(implicit)]
146        location: Location,
147        source: common_meta::error::Error,
148    },
149
150    #[snafu(display("Failed to set next sequence number"))]
151    SetNextSequence {
152        #[snafu(implicit)]
153        location: Location,
154        source: common_meta::error::Error,
155    },
156
157    #[snafu(display("Failed to peek sequence number"))]
158    PeekSequence {
159        #[snafu(implicit)]
160        location: Location,
161        source: common_meta::error::Error,
162    },
163
164    #[snafu(display("Failed to start telemetry task"))]
165    StartTelemetryTask {
166        #[snafu(implicit)]
167        location: Location,
168        source: common_runtime::error::Error,
169    },
170
171    #[snafu(display("Failed to submit ddl task"))]
172    SubmitDdlTask {
173        #[snafu(implicit)]
174        location: Location,
175        source: common_meta::error::Error,
176    },
177
178    #[snafu(display("Failed to submit reconcile procedure"))]
179    SubmitReconcileProcedure {
180        #[snafu(implicit)]
181        location: Location,
182        source: common_meta::error::Error,
183    },
184
185    #[snafu(display("Failed to invalidate table cache"))]
186    InvalidateTableCache {
187        #[snafu(implicit)]
188        location: Location,
189        source: common_meta::error::Error,
190    },
191
192    #[snafu(display("Failed to list catalogs"))]
193    ListCatalogs {
194        #[snafu(implicit)]
195        location: Location,
196        source: BoxedError,
197    },
198
199    #[snafu(display("Failed to list {}'s schemas", catalog))]
200    ListSchemas {
201        #[snafu(implicit)]
202        location: Location,
203        catalog: String,
204        source: BoxedError,
205    },
206
207    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
208    ListTables {
209        #[snafu(implicit)]
210        location: Location,
211        catalog: String,
212        schema: String,
213        source: BoxedError,
214    },
215
216    #[snafu(display("Failed to join a future"))]
217    Join {
218        #[snafu(implicit)]
219        location: Location,
220        #[snafu(source)]
221        error: JoinError,
222    },
223
224    #[snafu(display(
225        "Failed to request {}, required: {}, but only {} available",
226        select_target,
227        required,
228        available
229    ))]
230    NoEnoughAvailableNode {
231        #[snafu(implicit)]
232        location: Location,
233        required: usize,
234        available: usize,
235        select_target: SelectTarget,
236    },
237
238    #[snafu(display("Failed to send shutdown signal"))]
239    SendShutdownSignal {
240        #[snafu(source)]
241        error: SendError<()>,
242    },
243
244    #[snafu(display("Failed to shutdown {} server", server))]
245    ShutdownServer {
246        #[snafu(implicit)]
247        location: Location,
248        source: servers::error::Error,
249        server: String,
250    },
251
252    #[snafu(display("Empty key is not allowed"))]
253    EmptyKey {
254        #[snafu(implicit)]
255        location: Location,
256    },
257
258    #[snafu(display("Failed to execute via Etcd"))]
259    EtcdFailed {
260        #[snafu(source)]
261        error: etcd_client::Error,
262        #[snafu(implicit)]
263        location: Location,
264    },
265
266    #[snafu(display("Failed to connect to Etcd"))]
267    ConnectEtcd {
268        #[snafu(source)]
269        error: etcd_client::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to read file: {}", path))]
275    FileIo {
276        #[snafu(source)]
277        error: std::io::Error,
278        #[snafu(implicit)]
279        location: Location,
280        path: String,
281    },
282
283    #[snafu(display("Failed to bind address {}", addr))]
284    TcpBind {
285        addr: String,
286        #[snafu(source)]
287        error: std::io::Error,
288        #[snafu(implicit)]
289        location: Location,
290    },
291
292    #[snafu(display("Failed to start gRPC server"))]
293    StartGrpc {
294        #[snafu(source)]
295        error: tonic::transport::Error,
296        #[snafu(implicit)]
297        location: Location,
298    },
299
300    #[snafu(display("Failed to start http server"))]
301    StartHttp {
302        #[snafu(implicit)]
303        location: Location,
304        source: servers::error::Error,
305    },
306
307    #[snafu(display("Failed to init export metrics task"))]
308    InitExportMetricsTask {
309        #[snafu(implicit)]
310        location: Location,
311        source: servers::error::Error,
312    },
313
314    #[snafu(display("Failed to parse address {}", addr))]
315    ParseAddr {
316        addr: String,
317        #[snafu(source)]
318        error: std::net::AddrParseError,
319    },
320
321    #[snafu(display("Invalid lease key: {}", key))]
322    InvalidLeaseKey {
323        key: String,
324        #[snafu(implicit)]
325        location: Location,
326    },
327
328    #[snafu(display("Invalid datanode stat key: {}", key))]
329    InvalidStatKey {
330        key: String,
331        #[snafu(implicit)]
332        location: Location,
333    },
334
335    #[snafu(display("Invalid inactive region key: {}", key))]
336    InvalidInactiveRegionKey {
337        key: String,
338        #[snafu(implicit)]
339        location: Location,
340    },
341
342    #[snafu(display("Failed to parse lease key from utf8"))]
343    LeaseKeyFromUtf8 {
344        #[snafu(source)]
345        error: std::string::FromUtf8Error,
346        #[snafu(implicit)]
347        location: Location,
348    },
349
350    #[snafu(display("Failed to parse lease value from utf8"))]
351    LeaseValueFromUtf8 {
352        #[snafu(source)]
353        error: std::string::FromUtf8Error,
354        #[snafu(implicit)]
355        location: Location,
356    },
357
358    #[snafu(display("Failed to parse invalid region key from utf8"))]
359    InvalidRegionKeyFromUtf8 {
360        #[snafu(source)]
361        error: std::string::FromUtf8Error,
362        #[snafu(implicit)]
363        location: Location,
364    },
365
366    #[snafu(display("Failed to serialize to json: {}", input))]
367    SerializeToJson {
368        input: String,
369        #[snafu(source)]
370        error: serde_json::error::Error,
371        #[snafu(implicit)]
372        location: Location,
373    },
374
375    #[snafu(display("Failed to deserialize from json: {}", input))]
376    DeserializeFromJson {
377        input: String,
378        #[snafu(source)]
379        error: serde_json::error::Error,
380        #[snafu(implicit)]
381        location: Location,
382    },
383
384    #[snafu(display("Failed to parse number: {}", err_msg))]
385    ParseNum {
386        err_msg: String,
387        #[snafu(source)]
388        error: std::num::ParseIntError,
389        #[snafu(implicit)]
390        location: Location,
391    },
392
393    #[snafu(display("Failed to parse bool: {}", err_msg))]
394    ParseBool {
395        err_msg: String,
396        #[snafu(source)]
397        error: std::str::ParseBoolError,
398        #[snafu(implicit)]
399        location: Location,
400    },
401
402    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
403    DowngradeLeader {
404        region_id: RegionId,
405        #[snafu(implicit)]
406        location: Location,
407        #[snafu(source)]
408        source: BoxedError,
409    },
410
411    #[snafu(display("Region's leader peer changed: {}", msg))]
412    LeaderPeerChanged {
413        msg: String,
414        #[snafu(implicit)]
415        location: Location,
416    },
417
418    #[snafu(display("Invalid arguments: {}", err_msg))]
419    InvalidArguments {
420        err_msg: String,
421        #[snafu(implicit)]
422        location: Location,
423    },
424
425    #[cfg(feature = "mysql_kvbackend")]
426    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
427    ParseMySqlUrl {
428        #[snafu(source)]
429        error: sqlx::error::Error,
430        mysql_url: String,
431        #[snafu(implicit)]
432        location: Location,
433    },
434
435    #[cfg(feature = "mysql_kvbackend")]
436    #[snafu(display("Failed to decode sql value"))]
437    DecodeSqlValue {
438        #[snafu(source)]
439        error: sqlx::error::Error,
440        #[snafu(implicit)]
441        location: Location,
442    },
443
444    #[snafu(display("Failed to find table route for {table_id}"))]
445    TableRouteNotFound {
446        table_id: TableId,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("Failed to find table route for {region_id}"))]
452    RegionRouteNotFound {
453        region_id: RegionId,
454        #[snafu(implicit)]
455        location: Location,
456    },
457
458    #[snafu(display("Table info not found: {}", table_id))]
459    TableInfoNotFound {
460        table_id: TableId,
461        #[snafu(implicit)]
462        location: Location,
463    },
464
465    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
466    DatanodeTableNotFound {
467        table_id: TableId,
468        datanode_id: DatanodeId,
469        #[snafu(implicit)]
470        location: Location,
471    },
472
473    #[snafu(display("Metasrv has no leader at this moment"))]
474    NoLeader {
475        #[snafu(implicit)]
476        location: Location,
477    },
478
479    #[snafu(display("Leader lease expired"))]
480    LeaderLeaseExpired {
481        #[snafu(implicit)]
482        location: Location,
483    },
484
485    #[snafu(display("Leader lease changed during election"))]
486    LeaderLeaseChanged {
487        #[snafu(implicit)]
488        location: Location,
489    },
490
491    #[snafu(display("Table {} not found", name))]
492    TableNotFound {
493        name: String,
494        #[snafu(implicit)]
495        location: Location,
496    },
497
498    #[snafu(display("Unsupported selector type, {}", selector_type))]
499    UnsupportedSelectorType {
500        selector_type: String,
501        #[snafu(implicit)]
502        location: Location,
503    },
504
505    #[snafu(display("Unexpected, violated: {violated}"))]
506    Unexpected {
507        violated: String,
508        #[snafu(implicit)]
509        location: Location,
510    },
511
512    #[snafu(display("Failed to create gRPC channel"))]
513    CreateChannel {
514        #[snafu(implicit)]
515        location: Location,
516        source: common_grpc::error::Error,
517    },
518
519    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
520    BatchGet {
521        #[snafu(source)]
522        error: tonic::Status,
523        #[snafu(implicit)]
524        location: Location,
525    },
526
527    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
528    Range {
529        #[snafu(source)]
530        error: tonic::Status,
531        #[snafu(implicit)]
532        location: Location,
533    },
534
535    #[snafu(display("Response header not found"))]
536    ResponseHeaderNotFound {
537        #[snafu(implicit)]
538        location: Location,
539    },
540
541    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
542    IsNotLeader {
543        node_addr: String,
544        #[snafu(implicit)]
545        location: Location,
546    },
547
548    #[snafu(display("Invalid http body"))]
549    InvalidHttpBody {
550        #[snafu(source)]
551        error: http::Error,
552        #[snafu(implicit)]
553        location: Location,
554    },
555
556    #[snafu(display(
557        "The number of retries for the grpc call {} exceeded the limit, {}",
558        func_name,
559        retry_num
560    ))]
561    ExceededRetryLimit {
562        func_name: String,
563        retry_num: usize,
564        #[snafu(implicit)]
565        location: Location,
566    },
567
568    #[snafu(display("Invalid utf-8 value"))]
569    InvalidUtf8Value {
570        #[snafu(source)]
571        error: std::string::FromUtf8Error,
572        #[snafu(implicit)]
573        location: Location,
574    },
575
576    #[snafu(display("Missing required parameter, param: {:?}", param))]
577    MissingRequiredParameter { param: String },
578
579    #[snafu(display("Failed to start procedure manager"))]
580    StartProcedureManager {
581        #[snafu(implicit)]
582        location: Location,
583        source: common_procedure::Error,
584    },
585
586    #[snafu(display("Failed to stop procedure manager"))]
587    StopProcedureManager {
588        #[snafu(implicit)]
589        location: Location,
590        source: common_procedure::Error,
591    },
592
593    #[snafu(display("Failed to wait procedure done"))]
594    WaitProcedure {
595        #[snafu(implicit)]
596        location: Location,
597        source: common_procedure::Error,
598    },
599
600    #[snafu(display("Failed to query procedure state"))]
601    QueryProcedure {
602        #[snafu(implicit)]
603        location: Location,
604        source: common_procedure::Error,
605    },
606
607    #[snafu(display("Procedure not found: {pid}"))]
608    ProcedureNotFound {
609        #[snafu(implicit)]
610        location: Location,
611        pid: String,
612    },
613
614    #[snafu(display("Failed to submit procedure"))]
615    SubmitProcedure {
616        #[snafu(implicit)]
617        location: Location,
618        source: common_procedure::Error,
619    },
620
621    #[snafu(display("A prune task for topic {} is already running", topic))]
622    PruneTaskAlreadyRunning {
623        topic: String,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Schema already exists, name: {schema_name}"))]
629    SchemaAlreadyExists {
630        schema_name: String,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Table already exists: {table_name}"))]
636    TableAlreadyExists {
637        table_name: String,
638        #[snafu(implicit)]
639        location: Location,
640    },
641
642    #[snafu(display("Pusher not found: {pusher_id}"))]
643    PusherNotFound {
644        pusher_id: String,
645        #[snafu(implicit)]
646        location: Location,
647    },
648
649    #[snafu(display("Failed to push message: {err_msg}"))]
650    PushMessage {
651        err_msg: String,
652        #[snafu(implicit)]
653        location: Location,
654    },
655
656    #[snafu(display("Mailbox already closed: {id}"))]
657    MailboxClosed {
658        id: u64,
659        #[snafu(implicit)]
660        location: Location,
661    },
662
663    #[snafu(display("Mailbox timeout: {id}"))]
664    MailboxTimeout {
665        id: u64,
666        #[snafu(implicit)]
667        location: Location,
668    },
669
670    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
671    MailboxReceiver {
672        id: u64,
673        err_msg: String,
674        #[snafu(implicit)]
675        location: Location,
676    },
677
678    #[snafu(display("Mailbox channel closed: {channel}"))]
679    MailboxChannelClosed {
680        channel: Channel,
681        #[snafu(implicit)]
682        location: Location,
683    },
684
685    #[snafu(display("Missing request header"))]
686    MissingRequestHeader {
687        #[snafu(implicit)]
688        location: Location,
689    },
690
691    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
692    RegisterProcedureLoader {
693        type_name: String,
694        #[snafu(implicit)]
695        location: Location,
696        source: common_procedure::error::Error,
697    },
698
699    #[snafu(display(
700        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
701        mailbox_message,
702        reason
703    ))]
704    UnexpectedInstructionReply {
705        mailbox_message: String,
706        reason: String,
707        #[snafu(implicit)]
708        location: Location,
709    },
710
711    #[snafu(display("Expected to retry later, reason: {}", reason))]
712    RetryLater {
713        reason: String,
714        #[snafu(implicit)]
715        location: Location,
716    },
717
718    #[snafu(display("Expected to retry later, reason: {}", reason))]
719    RetryLaterWithSource {
720        reason: String,
721        #[snafu(implicit)]
722        location: Location,
723        source: BoxedError,
724    },
725
726    #[snafu(display("Failed to convert proto data"))]
727    ConvertProtoData {
728        #[snafu(implicit)]
729        location: Location,
730        source: common_meta::error::Error,
731    },
732
733    // this error is used for custom error mapping
734    // please do not delete it
735    #[snafu(display("Other error"))]
736    Other {
737        source: BoxedError,
738        #[snafu(implicit)]
739        location: Location,
740    },
741
742    #[snafu(display("Table metadata manager error"))]
743    TableMetadataManager {
744        source: common_meta::error::Error,
745        #[snafu(implicit)]
746        location: Location,
747    },
748
749    #[snafu(display("Runtime switch manager error"))]
750    RuntimeSwitchManager {
751        source: common_meta::error::Error,
752        #[snafu(implicit)]
753        location: Location,
754    },
755
756    #[snafu(display("Keyvalue backend error"))]
757    KvBackend {
758        source: common_meta::error::Error,
759        #[snafu(implicit)]
760        location: Location,
761    },
762
763    #[snafu(display("Failed to publish message"))]
764    PublishMessage {
765        #[snafu(source)]
766        error: SendError<Message>,
767        #[snafu(implicit)]
768        location: Location,
769    },
770
771    #[snafu(display("Too many partitions"))]
772    TooManyPartitions {
773        #[snafu(implicit)]
774        location: Location,
775    },
776
777    #[snafu(display("Unsupported operation {}", operation))]
778    Unsupported {
779        operation: String,
780        #[snafu(implicit)]
781        location: Location,
782    },
783
784    #[snafu(display("Unexpected table route type: {}", err_msg))]
785    UnexpectedLogicalRouteTable {
786        #[snafu(implicit)]
787        location: Location,
788        err_msg: String,
789        source: common_meta::error::Error,
790    },
791
792    #[snafu(display("Failed to save cluster info"))]
793    SaveClusterInfo {
794        #[snafu(implicit)]
795        location: Location,
796        source: common_meta::error::Error,
797    },
798
799    #[snafu(display("Invalid cluster info format"))]
800    InvalidClusterInfoFormat {
801        #[snafu(implicit)]
802        location: Location,
803        source: common_meta::error::Error,
804    },
805
806    #[snafu(display("Invalid datanode stat format"))]
807    InvalidDatanodeStatFormat {
808        #[snafu(implicit)]
809        location: Location,
810        source: common_meta::error::Error,
811    },
812
813    #[snafu(display("Invalid node info format"))]
814    InvalidNodeInfoFormat {
815        #[snafu(implicit)]
816        location: Location,
817        source: common_meta::error::Error,
818    },
819
820    #[snafu(display("Failed to serialize options to TOML"))]
821    TomlFormat {
822        #[snafu(implicit)]
823        location: Location,
824        #[snafu(source(from(common_config::error::Error, Box::new)))]
825        source: Box<common_config::error::Error>,
826    },
827
828    #[cfg(feature = "pg_kvbackend")]
829    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
830    PostgresExecution {
831        #[snafu(source)]
832        error: tokio_postgres::Error,
833        sql: String,
834        #[snafu(implicit)]
835        location: Location,
836    },
837
838    #[cfg(feature = "pg_kvbackend")]
839    #[snafu(display("Failed to get Postgres client"))]
840    GetPostgresClient {
841        #[snafu(implicit)]
842        location: Location,
843        #[snafu(source)]
844        error: deadpool::managed::PoolError<tokio_postgres::Error>,
845    },
846
847    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
848    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
849    SqlExecutionTimeout {
850        #[snafu(implicit)]
851        location: Location,
852        sql: String,
853        duration: std::time::Duration,
854    },
855
856    #[cfg(feature = "pg_kvbackend")]
857    #[snafu(display("Failed to create connection pool for Postgres"))]
858    CreatePostgresPool {
859        #[snafu(source)]
860        error: deadpool_postgres::CreatePoolError,
861        #[snafu(implicit)]
862        location: Location,
863    },
864
865    #[cfg(feature = "pg_kvbackend")]
866    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
867    GetPostgresConnection {
868        reason: String,
869        #[snafu(implicit)]
870        location: Location,
871    },
872
873    #[cfg(feature = "mysql_kvbackend")]
874    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
875    MySqlExecution {
876        #[snafu(source)]
877        error: sqlx::Error,
878        #[snafu(implicit)]
879        location: Location,
880        sql: String,
881    },
882
883    #[cfg(feature = "mysql_kvbackend")]
884    #[snafu(display("Failed to create mysql pool"))]
885    CreateMySqlPool {
886        #[snafu(source)]
887        error: sqlx::Error,
888        #[snafu(implicit)]
889        location: Location,
890    },
891
892    #[cfg(feature = "mysql_kvbackend")]
893    #[snafu(display("Failed to acquire mysql client from pool"))]
894    AcquireMySqlClient {
895        #[snafu(source)]
896        error: sqlx::Error,
897        #[snafu(implicit)]
898        location: Location,
899    },
900
901    #[snafu(display("Handler not found: {}", name))]
902    HandlerNotFound {
903        name: String,
904        #[snafu(implicit)]
905        location: Location,
906    },
907
908    #[snafu(display("Flow state handler error"))]
909    FlowStateHandler {
910        #[snafu(implicit)]
911        location: Location,
912        source: common_meta::error::Error,
913    },
914
915    #[snafu(display("Failed to build wal options allocator"))]
916    BuildWalOptionsAllocator {
917        #[snafu(implicit)]
918        location: Location,
919        source: common_meta::error::Error,
920    },
921
922    #[snafu(display("Failed to parse wal options"))]
923    ParseWalOptions {
924        #[snafu(implicit)]
925        location: Location,
926        source: common_meta::error::Error,
927    },
928
929    #[snafu(display("Failed to build kafka client."))]
930    BuildKafkaClient {
931        #[snafu(implicit)]
932        location: Location,
933        #[snafu(source)]
934        error: common_meta::error::Error,
935    },
936
937    #[snafu(display(
938        "Failed to build a Kafka partition client, topic: {}, partition: {}",
939        topic,
940        partition
941    ))]
942    BuildPartitionClient {
943        topic: String,
944        partition: i32,
945        #[snafu(implicit)]
946        location: Location,
947        #[snafu(source)]
948        error: rskafka::client::error::Error,
949    },
950
951    #[snafu(display(
952        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
953        topic,
954        partition,
955        offset
956    ))]
957    DeleteRecords {
958        #[snafu(implicit)]
959        location: Location,
960        #[snafu(source)]
961        error: rskafka::client::error::Error,
962        topic: String,
963        partition: i32,
964        offset: u64,
965    },
966
967    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
968    GetOffset {
969        topic: String,
970        #[snafu(implicit)]
971        location: Location,
972        #[snafu(source)]
973        error: rskafka::client::error::Error,
974    },
975
976    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
977    UpdateTopicNameValue {
978        topic: String,
979        #[snafu(implicit)]
980        location: Location,
981        #[snafu(source)]
982        source: common_meta::error::Error,
983    },
984
985    #[snafu(display("Failed to build tls options"))]
986    BuildTlsOptions {
987        #[snafu(implicit)]
988        location: Location,
989        #[snafu(source)]
990        source: common_meta::error::Error,
991    },
992}
993
994impl Error {
995    /// Returns `true` if the error is retryable.
996    pub fn is_retryable(&self) -> bool {
997        matches!(self, Error::RetryLater { .. })
998            || matches!(self, Error::RetryLaterWithSource { .. })
999    }
1000}
1001
1002pub type Result<T> = std::result::Result<T, Error>;
1003
1004define_into_tonic_status!(Error);
1005
1006impl ErrorExt for Error {
1007    fn status_code(&self) -> StatusCode {
1008        match self {
1009            Error::EtcdFailed { .. }
1010            | Error::ConnectEtcd { .. }
1011            | Error::FileIo { .. }
1012            | Error::TcpBind { .. }
1013            | Error::SerializeToJson { .. }
1014            | Error::DeserializeFromJson { .. }
1015            | Error::NoLeader { .. }
1016            | Error::LeaderLeaseExpired { .. }
1017            | Error::LeaderLeaseChanged { .. }
1018            | Error::CreateChannel { .. }
1019            | Error::BatchGet { .. }
1020            | Error::Range { .. }
1021            | Error::ResponseHeaderNotFound { .. }
1022            | Error::InvalidHttpBody { .. }
1023            | Error::ExceededRetryLimit { .. }
1024            | Error::SendShutdownSignal { .. }
1025            | Error::PushMessage { .. }
1026            | Error::MailboxClosed { .. }
1027            | Error::MailboxReceiver { .. }
1028            | Error::StartGrpc { .. }
1029            | Error::PublishMessage { .. }
1030            | Error::Join { .. }
1031            | Error::ChooseItems { .. }
1032            | Error::FlowStateHandler { .. }
1033            | Error::BuildWalOptionsAllocator { .. }
1034            | Error::BuildPartitionClient { .. }
1035            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1036
1037            Error::DeleteRecords { .. }
1038            | Error::GetOffset { .. }
1039            | Error::PeerUnavailable { .. }
1040            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1041            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1042            Error::PruneTaskAlreadyRunning { .. }
1043            | Error::RetryLater { .. }
1044            | Error::MailboxChannelClosed { .. }
1045            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1046            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1047
1048            Error::Unsupported { .. } => StatusCode::Unsupported,
1049
1050            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1051
1052            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1053            Error::EmptyKey { .. }
1054            | Error::MissingRequiredParameter { .. }
1055            | Error::MissingRequestHeader { .. }
1056            | Error::InvalidLeaseKey { .. }
1057            | Error::InvalidStatKey { .. }
1058            | Error::InvalidInactiveRegionKey { .. }
1059            | Error::ParseNum { .. }
1060            | Error::ParseBool { .. }
1061            | Error::ParseAddr { .. }
1062            | Error::UnsupportedSelectorType { .. }
1063            | Error::InvalidArguments { .. }
1064            | Error::InitExportMetricsTask { .. }
1065            | Error::ProcedureNotFound { .. }
1066            | Error::TooManyPartitions { .. }
1067            | Error::TomlFormat { .. }
1068            | Error::HandlerNotFound { .. }
1069            | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1070            Error::LeaseKeyFromUtf8 { .. }
1071            | Error::LeaseValueFromUtf8 { .. }
1072            | Error::InvalidRegionKeyFromUtf8 { .. }
1073            | Error::TableRouteNotFound { .. }
1074            | Error::TableInfoNotFound { .. }
1075            | Error::DatanodeTableNotFound { .. }
1076            | Error::InvalidUtf8Value { .. }
1077            | Error::UnexpectedInstructionReply { .. }
1078            | Error::Unexpected { .. }
1079            | Error::RegionOpeningRace { .. }
1080            | Error::RegionRouteNotFound { .. }
1081            | Error::MigrationAbort { .. }
1082            | Error::MigrationRunning { .. }
1083            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1084            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1085            Error::SaveClusterInfo { source, .. }
1086            | Error::InvalidClusterInfoFormat { source, .. }
1087            | Error::InvalidDatanodeStatFormat { source, .. }
1088            | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1089            Error::InvalidateTableCache { source, .. } => source.status_code(),
1090            Error::SubmitProcedure { source, .. }
1091            | Error::WaitProcedure { source, .. }
1092            | Error::QueryProcedure { source, .. } => source.status_code(),
1093            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1094                source.status_code()
1095            }
1096            Error::StartProcedureManager { source, .. }
1097            | Error::StopProcedureManager { source, .. } => source.status_code(),
1098
1099            Error::ListCatalogs { source, .. }
1100            | Error::ListSchemas { source, .. }
1101            | Error::ListTables { source, .. } => source.status_code(),
1102            Error::StartTelemetryTask { source, .. } => source.status_code(),
1103
1104            Error::NextSequence { source, .. }
1105            | Error::SetNextSequence { source, .. }
1106            | Error::PeekSequence { source, .. } => source.status_code(),
1107            Error::DowngradeLeader { source, .. } => source.status_code(),
1108            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1109            Error::SubmitDdlTask { source, .. }
1110            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1111            Error::ConvertProtoData { source, .. }
1112            | Error::TableMetadataManager { source, .. }
1113            | Error::RuntimeSwitchManager { source, .. }
1114            | Error::KvBackend { source, .. }
1115            | Error::UnexpectedLogicalRouteTable { source, .. }
1116            | Error::UpdateTopicNameValue { source, .. }
1117            | Error::ParseWalOptions { source, .. } => source.status_code(),
1118            Error::ListActiveFrontends { source, .. }
1119            | Error::ListActiveDatanodes { source, .. }
1120            | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1121            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1122
1123            Error::InitMetadata { source, .. }
1124            | Error::InitDdlManager { source, .. }
1125            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1126
1127            Error::BuildTlsOptions { source, .. } => source.status_code(),
1128            Error::Other { source, .. } => source.status_code(),
1129            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1130
1131            #[cfg(feature = "pg_kvbackend")]
1132            Error::CreatePostgresPool { .. }
1133            | Error::GetPostgresClient { .. }
1134            | Error::GetPostgresConnection { .. }
1135            | Error::PostgresExecution { .. } => StatusCode::Internal,
1136            #[cfg(feature = "mysql_kvbackend")]
1137            Error::MySqlExecution { .. }
1138            | Error::CreateMySqlPool { .. }
1139            | Error::ParseMySqlUrl { .. }
1140            | Error::DecodeSqlValue { .. }
1141            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1142            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1143            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1144        }
1145    }
1146
1147    fn as_any(&self) -> &dyn std::any::Any {
1148        self
1149    }
1150}
1151
1152// for form tonic
1153pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1154    let mut err: &(dyn std::error::Error + 'static) = err_status;
1155
1156    loop {
1157        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1158            return Some(io_err);
1159        }
1160
1161        // h2::Error do not expose std::io::Error with `source()`
1162        // https://github.com/hyperium/h2/pull/462
1163        if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1164            && let Some(io_err) = h2_err.get_io()
1165        {
1166            return Some(io_err);
1167        }
1168
1169        err = err.source()?;
1170    }
1171}