meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Failed to choose items"))]
36    ChooseItems {
37        #[snafu(implicit)]
38        location: Location,
39        #[snafu(source)]
40        error: rand::distr::weighted::Error,
41    },
42
43    #[snafu(display("Exceeded deadline, operation: {}", operation))]
44    ExceededDeadline {
45        #[snafu(implicit)]
46        location: Location,
47        operation: String,
48    },
49
50    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51    PeerUnavailable {
52        #[snafu(implicit)]
53        location: Location,
54        peer_id: u64,
55    },
56
57    #[snafu(display("Failed to lookup frontends"))]
58    LookupFrontends {
59        #[snafu(implicit)]
60        location: Location,
61        source: common_meta::error::Error,
62    },
63
64    #[snafu(display("No available frontend"))]
65    NoAvailableFrontend {
66        #[snafu(implicit)]
67        location: Location,
68    },
69
70    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
71    MigrationRunning {
72        #[snafu(implicit)]
73        location: Location,
74        region_id: RegionId,
75    },
76
77    #[snafu(display(
78        "The region migration procedure is completed for region: {}, target_peer: {}",
79        region_id,
80        target_peer_id
81    ))]
82    RegionMigrated {
83        #[snafu(implicit)]
84        location: Location,
85        region_id: RegionId,
86        target_peer_id: u64,
87    },
88
89    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
90    MigrationAbort {
91        #[snafu(implicit)]
92        location: Location,
93        reason: String,
94    },
95
96    #[snafu(display(
97        "Another procedure is opening the region: {} on peer: {}",
98        region_id,
99        peer_id
100    ))]
101    RegionOpeningRace {
102        #[snafu(implicit)]
103        location: Location,
104        peer_id: DatanodeId,
105        region_id: RegionId,
106    },
107
108    #[snafu(display("Failed to init ddl manager"))]
109    InitDdlManager {
110        #[snafu(implicit)]
111        location: Location,
112        source: common_meta::error::Error,
113    },
114
115    #[snafu(display("Failed to init reconciliation manager"))]
116    InitReconciliationManager {
117        #[snafu(implicit)]
118        location: Location,
119        source: common_meta::error::Error,
120    },
121
122    #[snafu(display("Failed to create default catalog and schema"))]
123    InitMetadata {
124        #[snafu(implicit)]
125        location: Location,
126        source: common_meta::error::Error,
127    },
128
129    #[snafu(display("Failed to allocate next sequence number"))]
130    NextSequence {
131        #[snafu(implicit)]
132        location: Location,
133        source: common_meta::error::Error,
134    },
135
136    #[snafu(display("Failed to set next sequence number"))]
137    SetNextSequence {
138        #[snafu(implicit)]
139        location: Location,
140        source: common_meta::error::Error,
141    },
142
143    #[snafu(display("Failed to peek sequence number"))]
144    PeekSequence {
145        #[snafu(implicit)]
146        location: Location,
147        source: common_meta::error::Error,
148    },
149
150    #[snafu(display("Failed to start telemetry task"))]
151    StartTelemetryTask {
152        #[snafu(implicit)]
153        location: Location,
154        source: common_runtime::error::Error,
155    },
156
157    #[snafu(display("Failed to submit ddl task"))]
158    SubmitDdlTask {
159        #[snafu(implicit)]
160        location: Location,
161        source: common_meta::error::Error,
162    },
163
164    #[snafu(display("Failed to submit reconcile procedure"))]
165    SubmitReconcileProcedure {
166        #[snafu(implicit)]
167        location: Location,
168        source: common_meta::error::Error,
169    },
170
171    #[snafu(display("Failed to invalidate table cache"))]
172    InvalidateTableCache {
173        #[snafu(implicit)]
174        location: Location,
175        source: common_meta::error::Error,
176    },
177
178    #[snafu(display("Failed to list catalogs"))]
179    ListCatalogs {
180        #[snafu(implicit)]
181        location: Location,
182        source: BoxedError,
183    },
184
185    #[snafu(display("Failed to list {}'s schemas", catalog))]
186    ListSchemas {
187        #[snafu(implicit)]
188        location: Location,
189        catalog: String,
190        source: BoxedError,
191    },
192
193    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
194    ListTables {
195        #[snafu(implicit)]
196        location: Location,
197        catalog: String,
198        schema: String,
199        source: BoxedError,
200    },
201
202    #[snafu(display("Failed to join a future"))]
203    Join {
204        #[snafu(implicit)]
205        location: Location,
206        #[snafu(source)]
207        error: JoinError,
208    },
209
210    #[snafu(display(
211        "Failed to request {}, required: {}, but only {} available",
212        select_target,
213        required,
214        available
215    ))]
216    NoEnoughAvailableNode {
217        #[snafu(implicit)]
218        location: Location,
219        required: usize,
220        available: usize,
221        select_target: SelectTarget,
222    },
223
224    #[snafu(display("Failed to send shutdown signal"))]
225    SendShutdownSignal {
226        #[snafu(source)]
227        error: SendError<()>,
228    },
229
230    #[snafu(display("Failed to shutdown {} server", server))]
231    ShutdownServer {
232        #[snafu(implicit)]
233        location: Location,
234        source: servers::error::Error,
235        server: String,
236    },
237
238    #[snafu(display("Empty key is not allowed"))]
239    EmptyKey {
240        #[snafu(implicit)]
241        location: Location,
242    },
243
244    #[snafu(display("Failed to execute via Etcd"))]
245    EtcdFailed {
246        #[snafu(source)]
247        error: etcd_client::Error,
248        #[snafu(implicit)]
249        location: Location,
250    },
251
252    #[snafu(display("Failed to connect to Etcd"))]
253    ConnectEtcd {
254        #[snafu(source)]
255        error: etcd_client::Error,
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to read file: {}", path))]
261    FileIo {
262        #[snafu(source)]
263        error: std::io::Error,
264        #[snafu(implicit)]
265        location: Location,
266        path: String,
267    },
268
269    #[snafu(display("Failed to bind address {}", addr))]
270    TcpBind {
271        addr: String,
272        #[snafu(source)]
273        error: std::io::Error,
274        #[snafu(implicit)]
275        location: Location,
276    },
277
278    #[snafu(display("Failed to start gRPC server"))]
279    StartGrpc {
280        #[snafu(source)]
281        error: tonic::transport::Error,
282        #[snafu(implicit)]
283        location: Location,
284    },
285
286    #[snafu(display("Failed to start http server"))]
287    StartHttp {
288        #[snafu(implicit)]
289        location: Location,
290        source: servers::error::Error,
291    },
292
293    #[snafu(display("Failed to init export metrics task"))]
294    InitExportMetricsTask {
295        #[snafu(implicit)]
296        location: Location,
297        source: servers::error::Error,
298    },
299
300    #[snafu(display("Failed to parse address {}", addr))]
301    ParseAddr {
302        addr: String,
303        #[snafu(source)]
304        error: std::net::AddrParseError,
305    },
306
307    #[snafu(display("Invalid lease key: {}", key))]
308    InvalidLeaseKey {
309        key: String,
310        #[snafu(implicit)]
311        location: Location,
312    },
313
314    #[snafu(display("Invalid datanode stat key: {}", key))]
315    InvalidStatKey {
316        key: String,
317        #[snafu(implicit)]
318        location: Location,
319    },
320
321    #[snafu(display("Invalid inactive region key: {}", key))]
322    InvalidInactiveRegionKey {
323        key: String,
324        #[snafu(implicit)]
325        location: Location,
326    },
327
328    #[snafu(display("Failed to parse lease key from utf8"))]
329    LeaseKeyFromUtf8 {
330        #[snafu(source)]
331        error: std::string::FromUtf8Error,
332        #[snafu(implicit)]
333        location: Location,
334    },
335
336    #[snafu(display("Failed to parse lease value from utf8"))]
337    LeaseValueFromUtf8 {
338        #[snafu(source)]
339        error: std::string::FromUtf8Error,
340        #[snafu(implicit)]
341        location: Location,
342    },
343
344    #[snafu(display("Failed to parse invalid region key from utf8"))]
345    InvalidRegionKeyFromUtf8 {
346        #[snafu(source)]
347        error: std::string::FromUtf8Error,
348        #[snafu(implicit)]
349        location: Location,
350    },
351
352    #[snafu(display("Failed to serialize to json: {}", input))]
353    SerializeToJson {
354        input: String,
355        #[snafu(source)]
356        error: serde_json::error::Error,
357        #[snafu(implicit)]
358        location: Location,
359    },
360
361    #[snafu(display("Failed to deserialize from json: {}", input))]
362    DeserializeFromJson {
363        input: String,
364        #[snafu(source)]
365        error: serde_json::error::Error,
366        #[snafu(implicit)]
367        location: Location,
368    },
369
370    #[snafu(display("Failed to parse number: {}", err_msg))]
371    ParseNum {
372        err_msg: String,
373        #[snafu(source)]
374        error: std::num::ParseIntError,
375        #[snafu(implicit)]
376        location: Location,
377    },
378
379    #[snafu(display("Failed to parse bool: {}", err_msg))]
380    ParseBool {
381        err_msg: String,
382        #[snafu(source)]
383        error: std::str::ParseBoolError,
384        #[snafu(implicit)]
385        location: Location,
386    },
387
388    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
389    DowngradeLeader {
390        region_id: RegionId,
391        #[snafu(implicit)]
392        location: Location,
393        #[snafu(source)]
394        source: BoxedError,
395    },
396
397    #[snafu(display("Region's leader peer changed: {}", msg))]
398    LeaderPeerChanged {
399        msg: String,
400        #[snafu(implicit)]
401        location: Location,
402    },
403
404    #[snafu(display("Invalid arguments: {}", err_msg))]
405    InvalidArguments {
406        err_msg: String,
407        #[snafu(implicit)]
408        location: Location,
409    },
410
411    #[cfg(feature = "mysql_kvbackend")]
412    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
413    ParseMySqlUrl {
414        #[snafu(source)]
415        error: sqlx::error::Error,
416        mysql_url: String,
417        #[snafu(implicit)]
418        location: Location,
419    },
420
421    #[cfg(feature = "mysql_kvbackend")]
422    #[snafu(display("Failed to decode sql value"))]
423    DecodeSqlValue {
424        #[snafu(source)]
425        error: sqlx::error::Error,
426        #[snafu(implicit)]
427        location: Location,
428    },
429
430    #[snafu(display("Failed to find table route for {table_id}"))]
431    TableRouteNotFound {
432        table_id: TableId,
433        #[snafu(implicit)]
434        location: Location,
435    },
436
437    #[snafu(display("Failed to find table route for {region_id}"))]
438    RegionRouteNotFound {
439        region_id: RegionId,
440        #[snafu(implicit)]
441        location: Location,
442    },
443
444    #[snafu(display("Table info not found: {}", table_id))]
445    TableInfoNotFound {
446        table_id: TableId,
447        #[snafu(implicit)]
448        location: Location,
449    },
450
451    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
452    DatanodeTableNotFound {
453        table_id: TableId,
454        datanode_id: DatanodeId,
455        #[snafu(implicit)]
456        location: Location,
457    },
458
459    #[snafu(display("Metasrv has no leader at this moment"))]
460    NoLeader {
461        #[snafu(implicit)]
462        location: Location,
463    },
464
465    #[snafu(display("Leader lease expired"))]
466    LeaderLeaseExpired {
467        #[snafu(implicit)]
468        location: Location,
469    },
470
471    #[snafu(display("Leader lease changed during election"))]
472    LeaderLeaseChanged {
473        #[snafu(implicit)]
474        location: Location,
475    },
476
477    #[snafu(display("Table {} not found", name))]
478    TableNotFound {
479        name: String,
480        #[snafu(implicit)]
481        location: Location,
482    },
483
484    #[snafu(display("Unsupported selector type, {}", selector_type))]
485    UnsupportedSelectorType {
486        selector_type: String,
487        #[snafu(implicit)]
488        location: Location,
489    },
490
491    #[snafu(display("Unexpected, violated: {violated}"))]
492    Unexpected {
493        violated: String,
494        #[snafu(implicit)]
495        location: Location,
496    },
497
498    #[snafu(display("Failed to create gRPC channel"))]
499    CreateChannel {
500        #[snafu(implicit)]
501        location: Location,
502        source: common_grpc::error::Error,
503    },
504
505    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
506    BatchGet {
507        #[snafu(source)]
508        error: tonic::Status,
509        #[snafu(implicit)]
510        location: Location,
511    },
512
513    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
514    Range {
515        #[snafu(source)]
516        error: tonic::Status,
517        #[snafu(implicit)]
518        location: Location,
519    },
520
521    #[snafu(display("Response header not found"))]
522    ResponseHeaderNotFound {
523        #[snafu(implicit)]
524        location: Location,
525    },
526
527    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
528    IsNotLeader {
529        node_addr: String,
530        #[snafu(implicit)]
531        location: Location,
532    },
533
534    #[snafu(display("Invalid http body"))]
535    InvalidHttpBody {
536        #[snafu(source)]
537        error: http::Error,
538        #[snafu(implicit)]
539        location: Location,
540    },
541
542    #[snafu(display(
543        "The number of retries for the grpc call {} exceeded the limit, {}",
544        func_name,
545        retry_num
546    ))]
547    ExceededRetryLimit {
548        func_name: String,
549        retry_num: usize,
550        #[snafu(implicit)]
551        location: Location,
552    },
553
554    #[snafu(display("Invalid utf-8 value"))]
555    InvalidUtf8Value {
556        #[snafu(source)]
557        error: std::string::FromUtf8Error,
558        #[snafu(implicit)]
559        location: Location,
560    },
561
562    #[snafu(display("Missing required parameter, param: {:?}", param))]
563    MissingRequiredParameter { param: String },
564
565    #[snafu(display("Failed to start procedure manager"))]
566    StartProcedureManager {
567        #[snafu(implicit)]
568        location: Location,
569        source: common_procedure::Error,
570    },
571
572    #[snafu(display("Failed to stop procedure manager"))]
573    StopProcedureManager {
574        #[snafu(implicit)]
575        location: Location,
576        source: common_procedure::Error,
577    },
578
579    #[snafu(display("Failed to wait procedure done"))]
580    WaitProcedure {
581        #[snafu(implicit)]
582        location: Location,
583        source: common_procedure::Error,
584    },
585
586    #[snafu(display("Failed to query procedure state"))]
587    QueryProcedure {
588        #[snafu(implicit)]
589        location: Location,
590        source: common_procedure::Error,
591    },
592
593    #[snafu(display("Procedure not found: {pid}"))]
594    ProcedureNotFound {
595        #[snafu(implicit)]
596        location: Location,
597        pid: String,
598    },
599
600    #[snafu(display("Failed to submit procedure"))]
601    SubmitProcedure {
602        #[snafu(implicit)]
603        location: Location,
604        source: common_procedure::Error,
605    },
606
607    #[snafu(display("A prune task for topic {} is already running", topic))]
608    PruneTaskAlreadyRunning {
609        topic: String,
610        #[snafu(implicit)]
611        location: Location,
612    },
613
614    #[snafu(display("Schema already exists, name: {schema_name}"))]
615    SchemaAlreadyExists {
616        schema_name: String,
617        #[snafu(implicit)]
618        location: Location,
619    },
620
621    #[snafu(display("Table already exists: {table_name}"))]
622    TableAlreadyExists {
623        table_name: String,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Pusher not found: {pusher_id}"))]
629    PusherNotFound {
630        pusher_id: String,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Failed to push message: {err_msg}"))]
636    PushMessage {
637        err_msg: String,
638        #[snafu(implicit)]
639        location: Location,
640    },
641
642    #[snafu(display("Mailbox already closed: {id}"))]
643    MailboxClosed {
644        id: u64,
645        #[snafu(implicit)]
646        location: Location,
647    },
648
649    #[snafu(display("Mailbox timeout: {id}"))]
650    MailboxTimeout {
651        id: u64,
652        #[snafu(implicit)]
653        location: Location,
654    },
655
656    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
657    MailboxReceiver {
658        id: u64,
659        err_msg: String,
660        #[snafu(implicit)]
661        location: Location,
662    },
663
664    #[snafu(display("Mailbox channel closed: {channel}"))]
665    MailboxChannelClosed {
666        channel: Channel,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Missing request header"))]
672    MissingRequestHeader {
673        #[snafu(implicit)]
674        location: Location,
675    },
676
677    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
678    RegisterProcedureLoader {
679        type_name: String,
680        #[snafu(implicit)]
681        location: Location,
682        source: common_procedure::error::Error,
683    },
684
685    #[snafu(display(
686        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
687        mailbox_message,
688        reason
689    ))]
690    UnexpectedInstructionReply {
691        mailbox_message: String,
692        reason: String,
693        #[snafu(implicit)]
694        location: Location,
695    },
696
697    #[snafu(display("Expected to retry later, reason: {}", reason))]
698    RetryLater {
699        reason: String,
700        #[snafu(implicit)]
701        location: Location,
702    },
703
704    #[snafu(display("Expected to retry later, reason: {}", reason))]
705    RetryLaterWithSource {
706        reason: String,
707        #[snafu(implicit)]
708        location: Location,
709        source: BoxedError,
710    },
711
712    #[snafu(display("Failed to convert proto data"))]
713    ConvertProtoData {
714        #[snafu(implicit)]
715        location: Location,
716        source: common_meta::error::Error,
717    },
718
719    // this error is used for custom error mapping
720    // please do not delete it
721    #[snafu(display("Other error"))]
722    Other {
723        source: BoxedError,
724        #[snafu(implicit)]
725        location: Location,
726    },
727
728    #[snafu(display("Table metadata manager error"))]
729    TableMetadataManager {
730        source: common_meta::error::Error,
731        #[snafu(implicit)]
732        location: Location,
733    },
734
735    #[snafu(display("Runtime switch manager error"))]
736    RuntimeSwitchManager {
737        source: common_meta::error::Error,
738        #[snafu(implicit)]
739        location: Location,
740    },
741
742    #[snafu(display("Keyvalue backend error"))]
743    KvBackend {
744        source: common_meta::error::Error,
745        #[snafu(implicit)]
746        location: Location,
747    },
748
749    #[snafu(display("Failed to publish message"))]
750    PublishMessage {
751        #[snafu(source)]
752        error: SendError<Message>,
753        #[snafu(implicit)]
754        location: Location,
755    },
756
757    #[snafu(display("Too many partitions"))]
758    TooManyPartitions {
759        #[snafu(implicit)]
760        location: Location,
761    },
762
763    #[snafu(display("Unsupported operation {}", operation))]
764    Unsupported {
765        operation: String,
766        #[snafu(implicit)]
767        location: Location,
768    },
769
770    #[snafu(display("Unexpected table route type: {}", err_msg))]
771    UnexpectedLogicalRouteTable {
772        #[snafu(implicit)]
773        location: Location,
774        err_msg: String,
775        source: common_meta::error::Error,
776    },
777
778    #[snafu(display("Failed to save cluster info"))]
779    SaveClusterInfo {
780        #[snafu(implicit)]
781        location: Location,
782        source: common_meta::error::Error,
783    },
784
785    #[snafu(display("Invalid cluster info format"))]
786    InvalidClusterInfoFormat {
787        #[snafu(implicit)]
788        location: Location,
789        source: common_meta::error::Error,
790    },
791
792    #[snafu(display("Invalid datanode stat format"))]
793    InvalidDatanodeStatFormat {
794        #[snafu(implicit)]
795        location: Location,
796        source: common_meta::error::Error,
797    },
798
799    #[snafu(display("Failed to serialize options to TOML"))]
800    TomlFormat {
801        #[snafu(implicit)]
802        location: Location,
803        #[snafu(source(from(common_config::error::Error, Box::new)))]
804        source: Box<common_config::error::Error>,
805    },
806
807    #[cfg(feature = "pg_kvbackend")]
808    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
809    PostgresExecution {
810        #[snafu(source)]
811        error: tokio_postgres::Error,
812        sql: String,
813        #[snafu(implicit)]
814        location: Location,
815    },
816
817    #[cfg(feature = "pg_kvbackend")]
818    #[snafu(display("Failed to get Postgres client"))]
819    GetPostgresClient {
820        #[snafu(implicit)]
821        location: Location,
822        #[snafu(source)]
823        error: deadpool::managed::PoolError<tokio_postgres::Error>,
824    },
825
826    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
827    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
828    SqlExecutionTimeout {
829        #[snafu(implicit)]
830        location: Location,
831        sql: String,
832        duration: std::time::Duration,
833    },
834
835    #[cfg(feature = "pg_kvbackend")]
836    #[snafu(display("Failed to create connection pool for Postgres"))]
837    CreatePostgresPool {
838        #[snafu(source)]
839        error: deadpool_postgres::CreatePoolError,
840        #[snafu(implicit)]
841        location: Location,
842    },
843
844    #[cfg(feature = "pg_kvbackend")]
845    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
846    GetPostgresConnection {
847        reason: String,
848        #[snafu(implicit)]
849        location: Location,
850    },
851
852    #[cfg(feature = "mysql_kvbackend")]
853    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
854    MySqlExecution {
855        #[snafu(source)]
856        error: sqlx::Error,
857        #[snafu(implicit)]
858        location: Location,
859        sql: String,
860    },
861
862    #[cfg(feature = "mysql_kvbackend")]
863    #[snafu(display("Failed to create mysql pool"))]
864    CreateMySqlPool {
865        #[snafu(source)]
866        error: sqlx::Error,
867        #[snafu(implicit)]
868        location: Location,
869    },
870
871    #[cfg(feature = "mysql_kvbackend")]
872    #[snafu(display("Failed to acquire mysql client from pool"))]
873    AcquireMySqlClient {
874        #[snafu(source)]
875        error: sqlx::Error,
876        #[snafu(implicit)]
877        location: Location,
878    },
879
880    #[snafu(display("Handler not found: {}", name))]
881    HandlerNotFound {
882        name: String,
883        #[snafu(implicit)]
884        location: Location,
885    },
886
887    #[snafu(display("Flow state handler error"))]
888    FlowStateHandler {
889        #[snafu(implicit)]
890        location: Location,
891        source: common_meta::error::Error,
892    },
893
894    #[snafu(display("Failed to build wal options allocator"))]
895    BuildWalOptionsAllocator {
896        #[snafu(implicit)]
897        location: Location,
898        source: common_meta::error::Error,
899    },
900
901    #[snafu(display("Failed to parse wal options"))]
902    ParseWalOptions {
903        #[snafu(implicit)]
904        location: Location,
905        source: common_meta::error::Error,
906    },
907
908    #[snafu(display("Failed to build kafka client."))]
909    BuildKafkaClient {
910        #[snafu(implicit)]
911        location: Location,
912        #[snafu(source)]
913        error: common_meta::error::Error,
914    },
915
916    #[snafu(display(
917        "Failed to build a Kafka partition client, topic: {}, partition: {}",
918        topic,
919        partition
920    ))]
921    BuildPartitionClient {
922        topic: String,
923        partition: i32,
924        #[snafu(implicit)]
925        location: Location,
926        #[snafu(source)]
927        error: rskafka::client::error::Error,
928    },
929
930    #[snafu(display(
931        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
932        topic,
933        partition,
934        offset
935    ))]
936    DeleteRecords {
937        #[snafu(implicit)]
938        location: Location,
939        #[snafu(source)]
940        error: rskafka::client::error::Error,
941        topic: String,
942        partition: i32,
943        offset: u64,
944    },
945
946    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
947    GetOffset {
948        topic: String,
949        #[snafu(implicit)]
950        location: Location,
951        #[snafu(source)]
952        error: rskafka::client::error::Error,
953    },
954
955    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
956    UpdateTopicNameValue {
957        topic: String,
958        #[snafu(implicit)]
959        location: Location,
960        #[snafu(source)]
961        source: common_meta::error::Error,
962    },
963}
964
965impl Error {
966    /// Returns `true` if the error is retryable.
967    pub fn is_retryable(&self) -> bool {
968        matches!(self, Error::RetryLater { .. })
969            || matches!(self, Error::RetryLaterWithSource { .. })
970    }
971}
972
973pub type Result<T> = std::result::Result<T, Error>;
974
975define_into_tonic_status!(Error);
976
977impl ErrorExt for Error {
978    fn status_code(&self) -> StatusCode {
979        match self {
980            Error::EtcdFailed { .. }
981            | Error::ConnectEtcd { .. }
982            | Error::FileIo { .. }
983            | Error::TcpBind { .. }
984            | Error::SerializeToJson { .. }
985            | Error::DeserializeFromJson { .. }
986            | Error::NoLeader { .. }
987            | Error::LeaderLeaseExpired { .. }
988            | Error::LeaderLeaseChanged { .. }
989            | Error::CreateChannel { .. }
990            | Error::BatchGet { .. }
991            | Error::Range { .. }
992            | Error::ResponseHeaderNotFound { .. }
993            | Error::InvalidHttpBody { .. }
994            | Error::ExceededRetryLimit { .. }
995            | Error::SendShutdownSignal { .. }
996            | Error::PushMessage { .. }
997            | Error::MailboxClosed { .. }
998            | Error::MailboxReceiver { .. }
999            | Error::StartGrpc { .. }
1000            | Error::PublishMessage { .. }
1001            | Error::Join { .. }
1002            | Error::ChooseItems { .. }
1003            | Error::FlowStateHandler { .. }
1004            | Error::BuildWalOptionsAllocator { .. }
1005            | Error::BuildPartitionClient { .. }
1006            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1007
1008            Error::DeleteRecords { .. }
1009            | Error::GetOffset { .. }
1010            | Error::PeerUnavailable { .. }
1011            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1012            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1013            Error::PruneTaskAlreadyRunning { .. }
1014            | Error::RetryLater { .. }
1015            | Error::MailboxChannelClosed { .. }
1016            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1017            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1018
1019            Error::Unsupported { .. } => StatusCode::Unsupported,
1020
1021            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1022
1023            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1024            Error::EmptyKey { .. }
1025            | Error::MissingRequiredParameter { .. }
1026            | Error::MissingRequestHeader { .. }
1027            | Error::InvalidLeaseKey { .. }
1028            | Error::InvalidStatKey { .. }
1029            | Error::InvalidInactiveRegionKey { .. }
1030            | Error::ParseNum { .. }
1031            | Error::ParseBool { .. }
1032            | Error::ParseAddr { .. }
1033            | Error::UnsupportedSelectorType { .. }
1034            | Error::InvalidArguments { .. }
1035            | Error::InitExportMetricsTask { .. }
1036            | Error::ProcedureNotFound { .. }
1037            | Error::TooManyPartitions { .. }
1038            | Error::TomlFormat { .. }
1039            | Error::HandlerNotFound { .. }
1040            | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
1041            Error::LeaseKeyFromUtf8 { .. }
1042            | Error::LeaseValueFromUtf8 { .. }
1043            | Error::InvalidRegionKeyFromUtf8 { .. }
1044            | Error::TableRouteNotFound { .. }
1045            | Error::TableInfoNotFound { .. }
1046            | Error::DatanodeTableNotFound { .. }
1047            | Error::InvalidUtf8Value { .. }
1048            | Error::UnexpectedInstructionReply { .. }
1049            | Error::Unexpected { .. }
1050            | Error::RegionOpeningRace { .. }
1051            | Error::RegionRouteNotFound { .. }
1052            | Error::MigrationAbort { .. }
1053            | Error::MigrationRunning { .. }
1054            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1055            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1056            Error::SaveClusterInfo { source, .. }
1057            | Error::InvalidClusterInfoFormat { source, .. }
1058            | Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(),
1059            Error::InvalidateTableCache { source, .. } => source.status_code(),
1060            Error::SubmitProcedure { source, .. }
1061            | Error::WaitProcedure { source, .. }
1062            | Error::QueryProcedure { source, .. } => source.status_code(),
1063            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1064                source.status_code()
1065            }
1066            Error::StartProcedureManager { source, .. }
1067            | Error::StopProcedureManager { source, .. } => source.status_code(),
1068
1069            Error::ListCatalogs { source, .. }
1070            | Error::ListSchemas { source, .. }
1071            | Error::ListTables { source, .. } => source.status_code(),
1072            Error::StartTelemetryTask { source, .. } => source.status_code(),
1073
1074            Error::NextSequence { source, .. }
1075            | Error::SetNextSequence { source, .. }
1076            | Error::PeekSequence { source, .. } => source.status_code(),
1077            Error::DowngradeLeader { source, .. } => source.status_code(),
1078            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1079            Error::SubmitDdlTask { source, .. }
1080            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1081            Error::ConvertProtoData { source, .. }
1082            | Error::TableMetadataManager { source, .. }
1083            | Error::RuntimeSwitchManager { source, .. }
1084            | Error::KvBackend { source, .. }
1085            | Error::UnexpectedLogicalRouteTable { source, .. }
1086            | Error::UpdateTopicNameValue { source, .. }
1087            | Error::ParseWalOptions { source, .. } => source.status_code(),
1088            Error::LookupFrontends { source, .. } => source.status_code(),
1089            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1090
1091            Error::InitMetadata { source, .. }
1092            | Error::InitDdlManager { source, .. }
1093            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1094
1095            Error::Other { source, .. } => source.status_code(),
1096            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1097
1098            #[cfg(feature = "pg_kvbackend")]
1099            Error::CreatePostgresPool { .. }
1100            | Error::GetPostgresClient { .. }
1101            | Error::GetPostgresConnection { .. }
1102            | Error::PostgresExecution { .. } => StatusCode::Internal,
1103            #[cfg(feature = "mysql_kvbackend")]
1104            Error::MySqlExecution { .. }
1105            | Error::CreateMySqlPool { .. }
1106            | Error::ParseMySqlUrl { .. }
1107            | Error::DecodeSqlValue { .. }
1108            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1109            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1110            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1111        }
1112    }
1113
1114    fn as_any(&self) -> &dyn std::any::Any {
1115        self
1116    }
1117}
1118
1119// for form tonic
1120pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1121    let mut err: &(dyn std::error::Error + 'static) = err_status;
1122
1123    loop {
1124        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1125            return Some(io_err);
1126        }
1127
1128        // h2::Error do not expose std::io::Error with `source()`
1129        // https://github.com/hyperium/h2/pull/462
1130        if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
1131            if let Some(io_err) = h2_err.get_io() {
1132                return Some(io_err);
1133            }
1134        }
1135
1136        err = err.source()?;
1137    }
1138}