meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_runtime::JoinError;
21use snafu::{Location, Snafu};
22use store_api::storage::RegionId;
23use table::metadata::TableId;
24use tokio::sync::mpsc::error::SendError;
25use tonic::codegen::http;
26
27use crate::metasrv::SelectTarget;
28use crate::pubsub::Message;
29use crate::service::mailbox::Channel;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Failed to choose items"))]
36    ChooseItems {
37        #[snafu(implicit)]
38        location: Location,
39        #[snafu(source)]
40        error: rand::distr::weighted::Error,
41    },
42
43    #[snafu(display("Exceeded deadline, operation: {}", operation))]
44    ExceededDeadline {
45        #[snafu(implicit)]
46        location: Location,
47        operation: String,
48    },
49
50    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
51    PeerUnavailable {
52        #[snafu(implicit)]
53        location: Location,
54        peer_id: u64,
55    },
56
57    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
58    MigrationRunning {
59        #[snafu(implicit)]
60        location: Location,
61        region_id: RegionId,
62    },
63
64    #[snafu(display(
65        "The region migration procedure is completed for region: {}, target_peer: {}",
66        region_id,
67        target_peer_id
68    ))]
69    RegionMigrated {
70        #[snafu(implicit)]
71        location: Location,
72        region_id: RegionId,
73        target_peer_id: u64,
74    },
75
76    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
77    MigrationAbort {
78        #[snafu(implicit)]
79        location: Location,
80        reason: String,
81    },
82
83    #[snafu(display(
84        "Another procedure is opening the region: {} on peer: {}",
85        region_id,
86        peer_id
87    ))]
88    RegionOpeningRace {
89        #[snafu(implicit)]
90        location: Location,
91        peer_id: DatanodeId,
92        region_id: RegionId,
93    },
94
95    #[snafu(display("Failed to init ddl manager"))]
96    InitDdlManager {
97        #[snafu(implicit)]
98        location: Location,
99        source: common_meta::error::Error,
100    },
101
102    #[snafu(display("Failed to create default catalog and schema"))]
103    InitMetadata {
104        #[snafu(implicit)]
105        location: Location,
106        source: common_meta::error::Error,
107    },
108
109    #[snafu(display("Failed to allocate next sequence number"))]
110    NextSequence {
111        #[snafu(implicit)]
112        location: Location,
113        source: common_meta::error::Error,
114    },
115
116    #[snafu(display("Failed to start telemetry task"))]
117    StartTelemetryTask {
118        #[snafu(implicit)]
119        location: Location,
120        source: common_runtime::error::Error,
121    },
122
123    #[snafu(display("Failed to submit ddl task"))]
124    SubmitDdlTask {
125        #[snafu(implicit)]
126        location: Location,
127        source: common_meta::error::Error,
128    },
129
130    #[snafu(display("Failed to invalidate table cache"))]
131    InvalidateTableCache {
132        #[snafu(implicit)]
133        location: Location,
134        source: common_meta::error::Error,
135    },
136
137    #[snafu(display("Failed to list catalogs"))]
138    ListCatalogs {
139        #[snafu(implicit)]
140        location: Location,
141        source: BoxedError,
142    },
143
144    #[snafu(display("Failed to list {}'s schemas", catalog))]
145    ListSchemas {
146        #[snafu(implicit)]
147        location: Location,
148        catalog: String,
149        source: BoxedError,
150    },
151
152    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
153    ListTables {
154        #[snafu(implicit)]
155        location: Location,
156        catalog: String,
157        schema: String,
158        source: BoxedError,
159    },
160
161    #[snafu(display("Failed to join a future"))]
162    Join {
163        #[snafu(implicit)]
164        location: Location,
165        #[snafu(source)]
166        error: JoinError,
167    },
168
169    #[snafu(display(
170        "Failed to request {}, required: {}, but only {} available",
171        select_target,
172        required,
173        available
174    ))]
175    NoEnoughAvailableNode {
176        #[snafu(implicit)]
177        location: Location,
178        required: usize,
179        available: usize,
180        select_target: SelectTarget,
181    },
182
183    #[snafu(display("Failed to send shutdown signal"))]
184    SendShutdownSignal {
185        #[snafu(source)]
186        error: SendError<()>,
187    },
188
189    #[snafu(display("Failed to shutdown {} server", server))]
190    ShutdownServer {
191        #[snafu(implicit)]
192        location: Location,
193        source: servers::error::Error,
194        server: String,
195    },
196
197    #[snafu(display("Empty key is not allowed"))]
198    EmptyKey {
199        #[snafu(implicit)]
200        location: Location,
201    },
202
203    #[snafu(display("Failed to execute via Etcd"))]
204    EtcdFailed {
205        #[snafu(source)]
206        error: etcd_client::Error,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to connect to Etcd"))]
212    ConnectEtcd {
213        #[snafu(source)]
214        error: etcd_client::Error,
215        #[snafu(implicit)]
216        location: Location,
217    },
218
219    #[snafu(display("Failed to bind address {}", addr))]
220    TcpBind {
221        addr: String,
222        #[snafu(source)]
223        error: std::io::Error,
224        #[snafu(implicit)]
225        location: Location,
226    },
227
228    #[snafu(display("Failed to convert to TcpIncoming"))]
229    TcpIncoming {
230        #[snafu(source)]
231        error: Box<dyn std::error::Error + Send + Sync>,
232    },
233
234    #[snafu(display("Failed to start gRPC server"))]
235    StartGrpc {
236        #[snafu(source)]
237        error: tonic::transport::Error,
238        #[snafu(implicit)]
239        location: Location,
240    },
241
242    #[snafu(display("Failed to start http server"))]
243    StartHttp {
244        #[snafu(implicit)]
245        location: Location,
246        source: servers::error::Error,
247    },
248
249    #[snafu(display("Failed to init export metrics task"))]
250    InitExportMetricsTask {
251        #[snafu(implicit)]
252        location: Location,
253        source: servers::error::Error,
254    },
255
256    #[snafu(display("Failed to parse address {}", addr))]
257    ParseAddr {
258        addr: String,
259        #[snafu(source)]
260        error: std::net::AddrParseError,
261    },
262
263    #[snafu(display("Invalid lease key: {}", key))]
264    InvalidLeaseKey {
265        key: String,
266        #[snafu(implicit)]
267        location: Location,
268    },
269
270    #[snafu(display("Invalid datanode stat key: {}", key))]
271    InvalidStatKey {
272        key: String,
273        #[snafu(implicit)]
274        location: Location,
275    },
276
277    #[snafu(display("Invalid inactive region key: {}", key))]
278    InvalidInactiveRegionKey {
279        key: String,
280        #[snafu(implicit)]
281        location: Location,
282    },
283
284    #[snafu(display("Failed to parse lease key from utf8"))]
285    LeaseKeyFromUtf8 {
286        #[snafu(source)]
287        error: std::string::FromUtf8Error,
288        #[snafu(implicit)]
289        location: Location,
290    },
291
292    #[snafu(display("Failed to parse lease value from utf8"))]
293    LeaseValueFromUtf8 {
294        #[snafu(source)]
295        error: std::string::FromUtf8Error,
296        #[snafu(implicit)]
297        location: Location,
298    },
299
300    #[snafu(display("Failed to parse invalid region key from utf8"))]
301    InvalidRegionKeyFromUtf8 {
302        #[snafu(source)]
303        error: std::string::FromUtf8Error,
304        #[snafu(implicit)]
305        location: Location,
306    },
307
308    #[snafu(display("Failed to serialize to json: {}", input))]
309    SerializeToJson {
310        input: String,
311        #[snafu(source)]
312        error: serde_json::error::Error,
313        #[snafu(implicit)]
314        location: Location,
315    },
316
317    #[snafu(display("Failed to deserialize from json: {}", input))]
318    DeserializeFromJson {
319        input: String,
320        #[snafu(source)]
321        error: serde_json::error::Error,
322        #[snafu(implicit)]
323        location: Location,
324    },
325
326    #[snafu(display("Failed to parse number: {}", err_msg))]
327    ParseNum {
328        err_msg: String,
329        #[snafu(source)]
330        error: std::num::ParseIntError,
331        #[snafu(implicit)]
332        location: Location,
333    },
334
335    #[snafu(display("Failed to parse bool: {}", err_msg))]
336    ParseBool {
337        err_msg: String,
338        #[snafu(source)]
339        error: std::str::ParseBoolError,
340        #[snafu(implicit)]
341        location: Location,
342    },
343
344    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
345    DowngradeLeader {
346        region_id: RegionId,
347        #[snafu(implicit)]
348        location: Location,
349        #[snafu(source)]
350        source: BoxedError,
351    },
352
353    #[snafu(display("Region's leader peer changed: {}", msg))]
354    LeaderPeerChanged {
355        msg: String,
356        #[snafu(implicit)]
357        location: Location,
358    },
359
360    #[snafu(display("Invalid arguments: {}", err_msg))]
361    InvalidArguments {
362        err_msg: String,
363        #[snafu(implicit)]
364        location: Location,
365    },
366
367    #[cfg(feature = "mysql_kvbackend")]
368    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
369    ParseMySqlUrl {
370        #[snafu(source)]
371        error: sqlx::error::Error,
372        mysql_url: String,
373        #[snafu(implicit)]
374        location: Location,
375    },
376
377    #[cfg(feature = "mysql_kvbackend")]
378    #[snafu(display("Failed to decode sql value"))]
379    DecodeSqlValue {
380        #[snafu(source)]
381        error: sqlx::error::Error,
382        #[snafu(implicit)]
383        location: Location,
384    },
385
386    #[snafu(display("Failed to find table route for {table_id}"))]
387    TableRouteNotFound {
388        table_id: TableId,
389        #[snafu(implicit)]
390        location: Location,
391    },
392
393    #[snafu(display("Failed to find table route for {region_id}"))]
394    RegionRouteNotFound {
395        region_id: RegionId,
396        #[snafu(implicit)]
397        location: Location,
398    },
399
400    #[snafu(display("Table info not found: {}", table_id))]
401    TableInfoNotFound {
402        table_id: TableId,
403        #[snafu(implicit)]
404        location: Location,
405    },
406
407    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
408    DatanodeTableNotFound {
409        table_id: TableId,
410        datanode_id: DatanodeId,
411        #[snafu(implicit)]
412        location: Location,
413    },
414
415    #[snafu(display("Metasrv has no leader at this moment"))]
416    NoLeader {
417        #[snafu(implicit)]
418        location: Location,
419    },
420
421    #[snafu(display("Leader lease expired"))]
422    LeaderLeaseExpired {
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Leader lease changed during election"))]
428    LeaderLeaseChanged {
429        #[snafu(implicit)]
430        location: Location,
431    },
432
433    #[snafu(display("Table {} not found", name))]
434    TableNotFound {
435        name: String,
436        #[snafu(implicit)]
437        location: Location,
438    },
439
440    #[snafu(display("Unsupported selector type, {}", selector_type))]
441    UnsupportedSelectorType {
442        selector_type: String,
443        #[snafu(implicit)]
444        location: Location,
445    },
446
447    #[snafu(display("Unexpected, violated: {violated}"))]
448    Unexpected {
449        violated: String,
450        #[snafu(implicit)]
451        location: Location,
452    },
453
454    #[snafu(display("Failed to create gRPC channel"))]
455    CreateChannel {
456        #[snafu(implicit)]
457        location: Location,
458        source: common_grpc::error::Error,
459    },
460
461    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
462    BatchGet {
463        #[snafu(source)]
464        error: tonic::Status,
465        #[snafu(implicit)]
466        location: Location,
467    },
468
469    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
470    Range {
471        #[snafu(source)]
472        error: tonic::Status,
473        #[snafu(implicit)]
474        location: Location,
475    },
476
477    #[snafu(display("Response header not found"))]
478    ResponseHeaderNotFound {
479        #[snafu(implicit)]
480        location: Location,
481    },
482
483    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
484    IsNotLeader {
485        node_addr: String,
486        #[snafu(implicit)]
487        location: Location,
488    },
489
490    #[snafu(display("Invalid http body"))]
491    InvalidHttpBody {
492        #[snafu(source)]
493        error: http::Error,
494        #[snafu(implicit)]
495        location: Location,
496    },
497
498    #[snafu(display(
499        "The number of retries for the grpc call {} exceeded the limit, {}",
500        func_name,
501        retry_num
502    ))]
503    ExceededRetryLimit {
504        func_name: String,
505        retry_num: usize,
506        #[snafu(implicit)]
507        location: Location,
508    },
509
510    #[snafu(display("Invalid utf-8 value"))]
511    InvalidUtf8Value {
512        #[snafu(source)]
513        error: std::string::FromUtf8Error,
514        #[snafu(implicit)]
515        location: Location,
516    },
517
518    #[snafu(display("Missing required parameter, param: {:?}", param))]
519    MissingRequiredParameter { param: String },
520
521    #[snafu(display("Failed to start procedure manager"))]
522    StartProcedureManager {
523        #[snafu(implicit)]
524        location: Location,
525        source: common_procedure::Error,
526    },
527
528    #[snafu(display("Failed to stop procedure manager"))]
529    StopProcedureManager {
530        #[snafu(implicit)]
531        location: Location,
532        source: common_procedure::Error,
533    },
534
535    #[snafu(display("Failed to wait procedure done"))]
536    WaitProcedure {
537        #[snafu(implicit)]
538        location: Location,
539        source: common_procedure::Error,
540    },
541
542    #[snafu(display("Failed to query procedure state"))]
543    QueryProcedure {
544        #[snafu(implicit)]
545        location: Location,
546        source: common_procedure::Error,
547    },
548
549    #[snafu(display("Procedure not found: {pid}"))]
550    ProcedureNotFound {
551        #[snafu(implicit)]
552        location: Location,
553        pid: String,
554    },
555
556    #[snafu(display("Failed to submit procedure"))]
557    SubmitProcedure {
558        #[snafu(implicit)]
559        location: Location,
560        source: common_procedure::Error,
561    },
562
563    #[snafu(display("A prune task for topic {} is already running", topic))]
564    PruneTaskAlreadyRunning {
565        topic: String,
566        #[snafu(implicit)]
567        location: Location,
568    },
569
570    #[snafu(display("Schema already exists, name: {schema_name}"))]
571    SchemaAlreadyExists {
572        schema_name: String,
573        #[snafu(implicit)]
574        location: Location,
575    },
576
577    #[snafu(display("Table already exists: {table_name}"))]
578    TableAlreadyExists {
579        table_name: String,
580        #[snafu(implicit)]
581        location: Location,
582    },
583
584    #[snafu(display("Pusher not found: {pusher_id}"))]
585    PusherNotFound {
586        pusher_id: String,
587        #[snafu(implicit)]
588        location: Location,
589    },
590
591    #[snafu(display("Failed to push message: {err_msg}"))]
592    PushMessage {
593        err_msg: String,
594        #[snafu(implicit)]
595        location: Location,
596    },
597
598    #[snafu(display("Mailbox already closed: {id}"))]
599    MailboxClosed {
600        id: u64,
601        #[snafu(implicit)]
602        location: Location,
603    },
604
605    #[snafu(display("Mailbox timeout: {id}"))]
606    MailboxTimeout {
607        id: u64,
608        #[snafu(implicit)]
609        location: Location,
610    },
611
612    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
613    MailboxReceiver {
614        id: u64,
615        err_msg: String,
616        #[snafu(implicit)]
617        location: Location,
618    },
619
620    #[snafu(display("Mailbox channel closed: {channel}"))]
621    MailboxChannelClosed {
622        channel: Channel,
623        #[snafu(implicit)]
624        location: Location,
625    },
626
627    #[snafu(display("Missing request header"))]
628    MissingRequestHeader {
629        #[snafu(implicit)]
630        location: Location,
631    },
632
633    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
634    RegisterProcedureLoader {
635        type_name: String,
636        #[snafu(implicit)]
637        location: Location,
638        source: common_procedure::error::Error,
639    },
640
641    #[snafu(display(
642        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
643        mailbox_message,
644        reason
645    ))]
646    UnexpectedInstructionReply {
647        mailbox_message: String,
648        reason: String,
649        #[snafu(implicit)]
650        location: Location,
651    },
652
653    #[snafu(display("Expected to retry later, reason: {}", reason))]
654    RetryLater {
655        reason: String,
656        #[snafu(implicit)]
657        location: Location,
658    },
659
660    #[snafu(display("Expected to retry later, reason: {}", reason))]
661    RetryLaterWithSource {
662        reason: String,
663        #[snafu(implicit)]
664        location: Location,
665        source: BoxedError,
666    },
667
668    #[snafu(display("Failed to convert proto data"))]
669    ConvertProtoData {
670        #[snafu(implicit)]
671        location: Location,
672        source: common_meta::error::Error,
673    },
674
675    // this error is used for custom error mapping
676    // please do not delete it
677    #[snafu(display("Other error"))]
678    Other {
679        source: BoxedError,
680        #[snafu(implicit)]
681        location: Location,
682    },
683
684    #[snafu(display("Table metadata manager error"))]
685    TableMetadataManager {
686        source: common_meta::error::Error,
687        #[snafu(implicit)]
688        location: Location,
689    },
690
691    #[snafu(display("Runtime switch manager error"))]
692    RuntimeSwitchManager {
693        source: common_meta::error::Error,
694        #[snafu(implicit)]
695        location: Location,
696    },
697
698    #[snafu(display("Keyvalue backend error"))]
699    KvBackend {
700        source: common_meta::error::Error,
701        #[snafu(implicit)]
702        location: Location,
703    },
704
705    #[snafu(display("Failed to publish message"))]
706    PublishMessage {
707        #[snafu(source)]
708        error: SendError<Message>,
709        #[snafu(implicit)]
710        location: Location,
711    },
712
713    #[snafu(display("Too many partitions"))]
714    TooManyPartitions {
715        #[snafu(implicit)]
716        location: Location,
717    },
718
719    #[snafu(display("Unsupported operation {}", operation))]
720    Unsupported {
721        operation: String,
722        #[snafu(implicit)]
723        location: Location,
724    },
725
726    #[snafu(display("Unexpected table route type: {}", err_msg))]
727    UnexpectedLogicalRouteTable {
728        #[snafu(implicit)]
729        location: Location,
730        err_msg: String,
731        source: common_meta::error::Error,
732    },
733
734    #[snafu(display("Failed to save cluster info"))]
735    SaveClusterInfo {
736        #[snafu(implicit)]
737        location: Location,
738        source: common_meta::error::Error,
739    },
740
741    #[snafu(display("Invalid cluster info format"))]
742    InvalidClusterInfoFormat {
743        #[snafu(implicit)]
744        location: Location,
745        source: common_meta::error::Error,
746    },
747
748    #[snafu(display("Invalid datanode stat format"))]
749    InvalidDatanodeStatFormat {
750        #[snafu(implicit)]
751        location: Location,
752        source: common_meta::error::Error,
753    },
754
755    #[snafu(display("Failed to serialize options to TOML"))]
756    TomlFormat {
757        #[snafu(implicit)]
758        location: Location,
759        #[snafu(source(from(common_config::error::Error, Box::new)))]
760        source: Box<common_config::error::Error>,
761    },
762
763    #[cfg(feature = "pg_kvbackend")]
764    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
765    PostgresExecution {
766        #[snafu(source)]
767        error: tokio_postgres::Error,
768        sql: String,
769        #[snafu(implicit)]
770        location: Location,
771    },
772
773    #[cfg(feature = "pg_kvbackend")]
774    #[snafu(display("Failed to get Postgres client"))]
775    GetPostgresClient {
776        #[snafu(implicit)]
777        location: Location,
778        #[snafu(source)]
779        error: deadpool::managed::PoolError<tokio_postgres::Error>,
780    },
781
782    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
783    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
784    SqlExecutionTimeout {
785        #[snafu(implicit)]
786        location: Location,
787        sql: String,
788        duration: std::time::Duration,
789    },
790
791    #[cfg(feature = "pg_kvbackend")]
792    #[snafu(display("Failed to create connection pool for Postgres"))]
793    CreatePostgresPool {
794        #[snafu(source)]
795        error: deadpool_postgres::CreatePoolError,
796        #[snafu(implicit)]
797        location: Location,
798    },
799
800    #[cfg(feature = "pg_kvbackend")]
801    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
802    GetPostgresConnection {
803        reason: String,
804        #[snafu(implicit)]
805        location: Location,
806    },
807
808    #[cfg(feature = "mysql_kvbackend")]
809    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
810    MySqlExecution {
811        #[snafu(source)]
812        error: sqlx::Error,
813        #[snafu(implicit)]
814        location: Location,
815        sql: String,
816    },
817
818    #[cfg(feature = "mysql_kvbackend")]
819    #[snafu(display("Failed to create mysql pool"))]
820    CreateMySqlPool {
821        #[snafu(source)]
822        error: sqlx::Error,
823        #[snafu(implicit)]
824        location: Location,
825    },
826
827    #[cfg(feature = "mysql_kvbackend")]
828    #[snafu(display("Failed to acquire mysql client from pool"))]
829    AcquireMySqlClient {
830        #[snafu(source)]
831        error: sqlx::Error,
832        #[snafu(implicit)]
833        location: Location,
834    },
835
836    #[snafu(display("Handler not found: {}", name))]
837    HandlerNotFound {
838        name: String,
839        #[snafu(implicit)]
840        location: Location,
841    },
842
843    #[snafu(display("Flow state handler error"))]
844    FlowStateHandler {
845        #[snafu(implicit)]
846        location: Location,
847        source: common_meta::error::Error,
848    },
849
850    #[snafu(display("Failed to build wal options allocator"))]
851    BuildWalOptionsAllocator {
852        #[snafu(implicit)]
853        location: Location,
854        source: common_meta::error::Error,
855    },
856
857    #[snafu(display("Failed to build kafka client."))]
858    BuildKafkaClient {
859        #[snafu(implicit)]
860        location: Location,
861        #[snafu(source)]
862        error: common_meta::error::Error,
863    },
864
865    #[snafu(display(
866        "Failed to build a Kafka partition client, topic: {}, partition: {}",
867        topic,
868        partition
869    ))]
870    BuildPartitionClient {
871        topic: String,
872        partition: i32,
873        #[snafu(implicit)]
874        location: Location,
875        #[snafu(source)]
876        error: rskafka::client::error::Error,
877    },
878
879    #[snafu(display(
880        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
881        topic,
882        partition,
883        offset
884    ))]
885    DeleteRecords {
886        #[snafu(implicit)]
887        location: Location,
888        #[snafu(source)]
889        error: rskafka::client::error::Error,
890        topic: String,
891        partition: i32,
892        offset: u64,
893    },
894
895    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
896    UpdateTopicNameValue {
897        topic: String,
898        #[snafu(implicit)]
899        location: Location,
900        #[snafu(source)]
901        source: common_meta::error::Error,
902    },
903}
904
905impl Error {
906    /// Returns `true` if the error is retryable.
907    pub fn is_retryable(&self) -> bool {
908        matches!(self, Error::RetryLater { .. })
909            || matches!(self, Error::RetryLaterWithSource { .. })
910    }
911}
912
913pub type Result<T> = std::result::Result<T, Error>;
914
915define_into_tonic_status!(Error);
916
917impl ErrorExt for Error {
918    fn status_code(&self) -> StatusCode {
919        match self {
920            Error::EtcdFailed { .. }
921            | Error::ConnectEtcd { .. }
922            | Error::TcpBind { .. }
923            | Error::TcpIncoming { .. }
924            | Error::SerializeToJson { .. }
925            | Error::DeserializeFromJson { .. }
926            | Error::NoLeader { .. }
927            | Error::LeaderLeaseExpired { .. }
928            | Error::LeaderLeaseChanged { .. }
929            | Error::CreateChannel { .. }
930            | Error::BatchGet { .. }
931            | Error::Range { .. }
932            | Error::ResponseHeaderNotFound { .. }
933            | Error::IsNotLeader { .. }
934            | Error::InvalidHttpBody { .. }
935            | Error::ExceededRetryLimit { .. }
936            | Error::SendShutdownSignal { .. }
937            | Error::PusherNotFound { .. }
938            | Error::PushMessage { .. }
939            | Error::MailboxClosed { .. }
940            | Error::MailboxTimeout { .. }
941            | Error::MailboxReceiver { .. }
942            | Error::MailboxChannelClosed { .. }
943            | Error::RetryLater { .. }
944            | Error::RetryLaterWithSource { .. }
945            | Error::StartGrpc { .. }
946            | Error::PublishMessage { .. }
947            | Error::Join { .. }
948            | Error::PeerUnavailable { .. }
949            | Error::ExceededDeadline { .. }
950            | Error::ChooseItems { .. }
951            | Error::FlowStateHandler { .. }
952            | Error::BuildWalOptionsAllocator { .. }
953            | Error::BuildPartitionClient { .. }
954            | Error::BuildKafkaClient { .. }
955            | Error::DeleteRecords { .. }
956            | Error::PruneTaskAlreadyRunning { .. } => StatusCode::Internal,
957
958            Error::Unsupported { .. } => StatusCode::Unsupported,
959
960            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
961
962            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
963            Error::EmptyKey { .. }
964            | Error::MissingRequiredParameter { .. }
965            | Error::MissingRequestHeader { .. }
966            | Error::InvalidLeaseKey { .. }
967            | Error::InvalidStatKey { .. }
968            | Error::InvalidInactiveRegionKey { .. }
969            | Error::ParseNum { .. }
970            | Error::ParseBool { .. }
971            | Error::ParseAddr { .. }
972            | Error::UnsupportedSelectorType { .. }
973            | Error::InvalidArguments { .. }
974            | Error::InitExportMetricsTask { .. }
975            | Error::ProcedureNotFound { .. }
976            | Error::TooManyPartitions { .. }
977            | Error::TomlFormat { .. }
978            | Error::HandlerNotFound { .. }
979            | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
980            Error::LeaseKeyFromUtf8 { .. }
981            | Error::LeaseValueFromUtf8 { .. }
982            | Error::InvalidRegionKeyFromUtf8 { .. }
983            | Error::TableRouteNotFound { .. }
984            | Error::TableInfoNotFound { .. }
985            | Error::DatanodeTableNotFound { .. }
986            | Error::InvalidUtf8Value { .. }
987            | Error::UnexpectedInstructionReply { .. }
988            | Error::Unexpected { .. }
989            | Error::RegionOpeningRace { .. }
990            | Error::RegionRouteNotFound { .. }
991            | Error::MigrationAbort { .. }
992            | Error::MigrationRunning { .. }
993            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
994            Error::TableNotFound { .. } => StatusCode::TableNotFound,
995            Error::SaveClusterInfo { source, .. }
996            | Error::InvalidClusterInfoFormat { source, .. }
997            | Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(),
998            Error::InvalidateTableCache { source, .. } => source.status_code(),
999            Error::SubmitProcedure { source, .. }
1000            | Error::WaitProcedure { source, .. }
1001            | Error::QueryProcedure { source, .. } => source.status_code(),
1002            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1003                source.status_code()
1004            }
1005            Error::StartProcedureManager { source, .. }
1006            | Error::StopProcedureManager { source, .. } => source.status_code(),
1007
1008            Error::ListCatalogs { source, .. }
1009            | Error::ListSchemas { source, .. }
1010            | Error::ListTables { source, .. } => source.status_code(),
1011            Error::StartTelemetryTask { source, .. } => source.status_code(),
1012
1013            Error::NextSequence { source, .. } => source.status_code(),
1014            Error::DowngradeLeader { source, .. } => source.status_code(),
1015            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1016            Error::SubmitDdlTask { source, .. } => source.status_code(),
1017            Error::ConvertProtoData { source, .. }
1018            | Error::TableMetadataManager { source, .. }
1019            | Error::RuntimeSwitchManager { source, .. }
1020            | Error::KvBackend { source, .. }
1021            | Error::UnexpectedLogicalRouteTable { source, .. }
1022            | Error::UpdateTopicNameValue { source, .. } => source.status_code(),
1023
1024            Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
1025                source.status_code()
1026            }
1027
1028            Error::Other { source, .. } => source.status_code(),
1029            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1030
1031            #[cfg(feature = "pg_kvbackend")]
1032            Error::CreatePostgresPool { .. }
1033            | Error::GetPostgresClient { .. }
1034            | Error::GetPostgresConnection { .. }
1035            | Error::PostgresExecution { .. } => StatusCode::Internal,
1036            #[cfg(feature = "mysql_kvbackend")]
1037            Error::MySqlExecution { .. }
1038            | Error::CreateMySqlPool { .. }
1039            | Error::ParseMySqlUrl { .. }
1040            | Error::DecodeSqlValue { .. }
1041            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1042            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1043            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1044        }
1045    }
1046
1047    fn as_any(&self) -> &dyn std::any::Any {
1048        self
1049    }
1050}
1051
1052// for form tonic
1053pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1054    let mut err: &(dyn std::error::Error + 'static) = err_status;
1055
1056    loop {
1057        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1058            return Some(io_err);
1059        }
1060
1061        // h2::Error do not expose std::io::Error with `source()`
1062        // https://github.com/hyperium/h2/pull/462
1063        if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
1064            if let Some(io_err) = h2_err.get_io() {
1065                return Some(io_err);
1066            }
1067        }
1068
1069        err = err.source()?;
1070    }
1071}