meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::metadata::TableId;
25use tokio::sync::mpsc::error::SendError;
26use tonic::codegen::http;
27use uuid::Uuid;
28
29use crate::metasrv::SelectTarget;
30use crate::pubsub::Message;
31use crate::service::mailbox::Channel;
32
33#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37    #[snafu(display("Failed to choose items"))]
38    ChooseItems {
39        #[snafu(implicit)]
40        location: Location,
41        #[snafu(source)]
42        error: rand::distr::weighted::Error,
43    },
44
45    #[snafu(display("Exceeded deadline, operation: {}", operation))]
46    ExceededDeadline {
47        #[snafu(implicit)]
48        location: Location,
49        operation: String,
50    },
51
52    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
53    PeerUnavailable {
54        #[snafu(implicit)]
55        location: Location,
56        peer_id: u64,
57    },
58
59    #[snafu(display("Failed to list active frontends"))]
60    ListActiveFrontends {
61        #[snafu(implicit)]
62        location: Location,
63        source: common_meta::error::Error,
64    },
65
66    #[snafu(display("Failed to list active datanodes"))]
67    ListActiveDatanodes {
68        #[snafu(implicit)]
69        location: Location,
70        source: common_meta::error::Error,
71    },
72
73    #[snafu(display("Failed to list active flownodes"))]
74    ListActiveFlownodes {
75        #[snafu(implicit)]
76        location: Location,
77        source: common_meta::error::Error,
78    },
79
80    #[snafu(display("No available frontend"))]
81    NoAvailableFrontend {
82        #[snafu(implicit)]
83        location: Location,
84    },
85
86    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
87    MigrationRunning {
88        #[snafu(implicit)]
89        location: Location,
90        region_id: RegionId,
91    },
92
93    #[snafu(display(
94        "The region migration procedure is completed for region: {}, target_peer: {}",
95        region_id,
96        target_peer_id
97    ))]
98    RegionMigrated {
99        #[snafu(implicit)]
100        location: Location,
101        region_id: RegionId,
102        target_peer_id: u64,
103    },
104
105    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
106    MigrationAbort {
107        #[snafu(implicit)]
108        location: Location,
109        reason: String,
110    },
111
112    #[snafu(display(
113        "Another procedure is operating the region: {} on peer: {}",
114        region_id,
115        peer_id
116    ))]
117    RegionOperatingRace {
118        #[snafu(implicit)]
119        location: Location,
120        peer_id: DatanodeId,
121        region_id: RegionId,
122    },
123
124    #[snafu(display("Failed to init ddl manager"))]
125    InitDdlManager {
126        #[snafu(implicit)]
127        location: Location,
128        source: common_meta::error::Error,
129    },
130
131    #[snafu(display("Failed to init reconciliation manager"))]
132    InitReconciliationManager {
133        #[snafu(implicit)]
134        location: Location,
135        source: common_meta::error::Error,
136    },
137
138    #[snafu(display("Failed to create default catalog and schema"))]
139    InitMetadata {
140        #[snafu(implicit)]
141        location: Location,
142        source: common_meta::error::Error,
143    },
144
145    #[snafu(display("Failed to allocate next sequence number"))]
146    NextSequence {
147        #[snafu(implicit)]
148        location: Location,
149        source: common_meta::error::Error,
150    },
151
152    #[snafu(display("Failed to set next sequence number"))]
153    SetNextSequence {
154        #[snafu(implicit)]
155        location: Location,
156        source: common_meta::error::Error,
157    },
158
159    #[snafu(display("Failed to peek sequence number"))]
160    PeekSequence {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_meta::error::Error,
164    },
165
166    #[snafu(display("Failed to start telemetry task"))]
167    StartTelemetryTask {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_runtime::error::Error,
171    },
172
173    #[snafu(display("Failed to submit ddl task"))]
174    SubmitDdlTask {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_meta::error::Error,
178    },
179
180    #[snafu(display("Failed to submit reconcile procedure"))]
181    SubmitReconcileProcedure {
182        #[snafu(implicit)]
183        location: Location,
184        source: common_meta::error::Error,
185    },
186
187    #[snafu(display("Failed to invalidate table cache"))]
188    InvalidateTableCache {
189        #[snafu(implicit)]
190        location: Location,
191        source: common_meta::error::Error,
192    },
193
194    #[snafu(display("Failed to list catalogs"))]
195    ListCatalogs {
196        #[snafu(implicit)]
197        location: Location,
198        source: BoxedError,
199    },
200
201    #[snafu(display("Failed to list {}'s schemas", catalog))]
202    ListSchemas {
203        #[snafu(implicit)]
204        location: Location,
205        catalog: String,
206        source: BoxedError,
207    },
208
209    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
210    ListTables {
211        #[snafu(implicit)]
212        location: Location,
213        catalog: String,
214        schema: String,
215        source: BoxedError,
216    },
217
218    #[snafu(display("Failed to join a future"))]
219    Join {
220        #[snafu(implicit)]
221        location: Location,
222        #[snafu(source)]
223        error: JoinError,
224    },
225
226    #[snafu(display(
227        "Failed to request {}, required: {}, but only {} available",
228        select_target,
229        required,
230        available
231    ))]
232    NoEnoughAvailableNode {
233        #[snafu(implicit)]
234        location: Location,
235        required: usize,
236        available: usize,
237        select_target: SelectTarget,
238    },
239
240    #[snafu(display("Failed to send shutdown signal"))]
241    SendShutdownSignal {
242        #[snafu(source)]
243        error: SendError<()>,
244    },
245
246    #[snafu(display("Failed to shutdown {} server", server))]
247    ShutdownServer {
248        #[snafu(implicit)]
249        location: Location,
250        source: servers::error::Error,
251        server: String,
252    },
253
254    #[snafu(display("Empty key is not allowed"))]
255    EmptyKey {
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to execute via Etcd"))]
261    EtcdFailed {
262        #[snafu(source)]
263        error: etcd_client::Error,
264        #[snafu(implicit)]
265        location: Location,
266    },
267
268    #[snafu(display("Failed to connect to Etcd"))]
269    ConnectEtcd {
270        #[snafu(source)]
271        error: etcd_client::Error,
272        #[snafu(implicit)]
273        location: Location,
274    },
275
276    #[snafu(display("Failed to read file: {}", path))]
277    FileIo {
278        #[snafu(source)]
279        error: std::io::Error,
280        #[snafu(implicit)]
281        location: Location,
282        path: String,
283    },
284
285    #[snafu(display("Failed to bind address {}", addr))]
286    TcpBind {
287        addr: String,
288        #[snafu(source)]
289        error: std::io::Error,
290        #[snafu(implicit)]
291        location: Location,
292    },
293
294    #[snafu(display("Failed to start gRPC server"))]
295    StartGrpc {
296        #[snafu(source)]
297        error: tonic::transport::Error,
298        #[snafu(implicit)]
299        location: Location,
300    },
301
302    #[snafu(display("Failed to start http server"))]
303    StartHttp {
304        #[snafu(implicit)]
305        location: Location,
306        source: servers::error::Error,
307    },
308
309    #[snafu(display("Failed to parse address {}", addr))]
310    ParseAddr {
311        addr: String,
312        #[snafu(source)]
313        error: std::net::AddrParseError,
314    },
315
316    #[snafu(display("Invalid lease key: {}", key))]
317    InvalidLeaseKey {
318        key: String,
319        #[snafu(implicit)]
320        location: Location,
321    },
322
323    #[snafu(display("Invalid datanode stat key: {}", key))]
324    InvalidStatKey {
325        key: String,
326        #[snafu(implicit)]
327        location: Location,
328    },
329
330    #[snafu(display("Invalid inactive region key: {}", key))]
331    InvalidInactiveRegionKey {
332        key: String,
333        #[snafu(implicit)]
334        location: Location,
335    },
336
337    #[snafu(display("Failed to parse lease key from utf8"))]
338    LeaseKeyFromUtf8 {
339        #[snafu(source)]
340        error: std::string::FromUtf8Error,
341        #[snafu(implicit)]
342        location: Location,
343    },
344
345    #[snafu(display("Failed to parse lease value from utf8"))]
346    LeaseValueFromUtf8 {
347        #[snafu(source)]
348        error: std::string::FromUtf8Error,
349        #[snafu(implicit)]
350        location: Location,
351    },
352
353    #[snafu(display("Failed to parse invalid region key from utf8"))]
354    InvalidRegionKeyFromUtf8 {
355        #[snafu(source)]
356        error: std::string::FromUtf8Error,
357        #[snafu(implicit)]
358        location: Location,
359    },
360
361    #[snafu(display("Failed to serialize to json: {}", input))]
362    SerializeToJson {
363        input: String,
364        #[snafu(source)]
365        error: serde_json::error::Error,
366        #[snafu(implicit)]
367        location: Location,
368    },
369
370    #[snafu(display("Failed to deserialize from json: {}", input))]
371    DeserializeFromJson {
372        input: String,
373        #[snafu(source)]
374        error: serde_json::error::Error,
375        #[snafu(implicit)]
376        location: Location,
377    },
378
379    #[snafu(display("Failed to parse number: {}", err_msg))]
380    ParseNum {
381        err_msg: String,
382        #[snafu(source)]
383        error: std::num::ParseIntError,
384        #[snafu(implicit)]
385        location: Location,
386    },
387
388    #[snafu(display("Failed to parse bool: {}", err_msg))]
389    ParseBool {
390        err_msg: String,
391        #[snafu(source)]
392        error: std::str::ParseBoolError,
393        #[snafu(implicit)]
394        location: Location,
395    },
396
397    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
398    DowngradeLeader {
399        region_id: RegionId,
400        #[snafu(implicit)]
401        location: Location,
402        #[snafu(source)]
403        source: BoxedError,
404    },
405
406    #[snafu(display("Region's leader peer changed: {}", msg))]
407    LeaderPeerChanged {
408        msg: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Invalid arguments: {}", err_msg))]
414    InvalidArguments {
415        err_msg: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[cfg(feature = "mysql_kvbackend")]
421    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
422    ParseMySqlUrl {
423        #[snafu(source)]
424        error: sqlx::error::Error,
425        mysql_url: String,
426        #[snafu(implicit)]
427        location: Location,
428    },
429
430    #[cfg(feature = "mysql_kvbackend")]
431    #[snafu(display("Failed to decode sql value"))]
432    DecodeSqlValue {
433        #[snafu(source)]
434        error: sqlx::error::Error,
435        #[snafu(implicit)]
436        location: Location,
437    },
438
439    #[snafu(display("Failed to find table route for {table_id}"))]
440    TableRouteNotFound {
441        table_id: TableId,
442        #[snafu(implicit)]
443        location: Location,
444    },
445
446    #[snafu(display("Failed to find table route for {region_id}"))]
447    RegionRouteNotFound {
448        region_id: RegionId,
449        #[snafu(implicit)]
450        location: Location,
451    },
452
453    #[snafu(display("Table info not found: {}", table_id))]
454    TableInfoNotFound {
455        table_id: TableId,
456        #[snafu(implicit)]
457        location: Location,
458    },
459
460    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
461    DatanodeTableNotFound {
462        table_id: TableId,
463        datanode_id: DatanodeId,
464        #[snafu(implicit)]
465        location: Location,
466    },
467
468    #[snafu(display("Metasrv has no leader at this moment"))]
469    NoLeader {
470        #[snafu(implicit)]
471        location: Location,
472    },
473
474    #[snafu(display("Leader lease expired"))]
475    LeaderLeaseExpired {
476        #[snafu(implicit)]
477        location: Location,
478    },
479
480    #[snafu(display("Leader lease changed during election"))]
481    LeaderLeaseChanged {
482        #[snafu(implicit)]
483        location: Location,
484    },
485
486    #[snafu(display("Table {} not found", name))]
487    TableNotFound {
488        name: String,
489        #[snafu(implicit)]
490        location: Location,
491    },
492
493    #[snafu(display("Unsupported selector type, {}", selector_type))]
494    UnsupportedSelectorType {
495        selector_type: String,
496        #[snafu(implicit)]
497        location: Location,
498    },
499
500    #[snafu(display("Unexpected, violated: {violated}"))]
501    Unexpected {
502        violated: String,
503        #[snafu(implicit)]
504        location: Location,
505    },
506
507    #[snafu(display("Failed to create gRPC channel"))]
508    CreateChannel {
509        #[snafu(implicit)]
510        location: Location,
511        source: common_grpc::error::Error,
512    },
513
514    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
515    BatchGet {
516        #[snafu(source)]
517        error: tonic::Status,
518        #[snafu(implicit)]
519        location: Location,
520    },
521
522    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
523    Range {
524        #[snafu(source)]
525        error: tonic::Status,
526        #[snafu(implicit)]
527        location: Location,
528    },
529
530    #[snafu(display("Response header not found"))]
531    ResponseHeaderNotFound {
532        #[snafu(implicit)]
533        location: Location,
534    },
535
536    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
537    IsNotLeader {
538        node_addr: String,
539        #[snafu(implicit)]
540        location: Location,
541    },
542
543    #[snafu(display("Invalid http body"))]
544    InvalidHttpBody {
545        #[snafu(source)]
546        error: http::Error,
547        #[snafu(implicit)]
548        location: Location,
549    },
550
551    #[snafu(display(
552        "The number of retries for the grpc call {} exceeded the limit, {}",
553        func_name,
554        retry_num
555    ))]
556    ExceededRetryLimit {
557        func_name: String,
558        retry_num: usize,
559        #[snafu(implicit)]
560        location: Location,
561    },
562
563    #[snafu(display("Invalid utf-8 value"))]
564    InvalidUtf8Value {
565        #[snafu(source)]
566        error: std::string::FromUtf8Error,
567        #[snafu(implicit)]
568        location: Location,
569    },
570
571    #[snafu(display("Missing required parameter, param: {:?}", param))]
572    MissingRequiredParameter { param: String },
573
574    #[snafu(display("Failed to start procedure manager"))]
575    StartProcedureManager {
576        #[snafu(implicit)]
577        location: Location,
578        source: common_procedure::Error,
579    },
580
581    #[snafu(display("Failed to stop procedure manager"))]
582    StopProcedureManager {
583        #[snafu(implicit)]
584        location: Location,
585        source: common_procedure::Error,
586    },
587
588    #[snafu(display("Failed to wait procedure done"))]
589    WaitProcedure {
590        #[snafu(implicit)]
591        location: Location,
592        source: common_procedure::Error,
593    },
594
595    #[snafu(display("Failed to query procedure state"))]
596    QueryProcedure {
597        #[snafu(implicit)]
598        location: Location,
599        source: common_procedure::Error,
600    },
601
602    #[snafu(display("Procedure not found: {pid}"))]
603    ProcedureNotFound {
604        #[snafu(implicit)]
605        location: Location,
606        pid: String,
607    },
608
609    #[snafu(display("Failed to submit procedure"))]
610    SubmitProcedure {
611        #[snafu(implicit)]
612        location: Location,
613        source: common_procedure::Error,
614    },
615
616    #[snafu(display("A prune task for topic {} is already running", topic))]
617    PruneTaskAlreadyRunning {
618        topic: String,
619        #[snafu(implicit)]
620        location: Location,
621    },
622
623    #[snafu(display("Schema already exists, name: {schema_name}"))]
624    SchemaAlreadyExists {
625        schema_name: String,
626        #[snafu(implicit)]
627        location: Location,
628    },
629
630    #[snafu(display("Table already exists: {table_name}"))]
631    TableAlreadyExists {
632        table_name: String,
633        #[snafu(implicit)]
634        location: Location,
635    },
636
637    #[snafu(display("Pusher not found: {pusher_id}"))]
638    PusherNotFound {
639        pusher_id: String,
640        #[snafu(implicit)]
641        location: Location,
642    },
643
644    #[snafu(display("Failed to push message: {err_msg}"))]
645    PushMessage {
646        err_msg: String,
647        #[snafu(implicit)]
648        location: Location,
649    },
650
651    #[snafu(display("Mailbox already closed: {id}"))]
652    MailboxClosed {
653        id: u64,
654        #[snafu(implicit)]
655        location: Location,
656    },
657
658    #[snafu(display("Mailbox timeout: {id}"))]
659    MailboxTimeout {
660        id: u64,
661        #[snafu(implicit)]
662        location: Location,
663    },
664
665    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
666    MailboxReceiver {
667        id: u64,
668        err_msg: String,
669        #[snafu(implicit)]
670        location: Location,
671    },
672
673    #[snafu(display("Mailbox channel closed: {channel}"))]
674    MailboxChannelClosed {
675        channel: Channel,
676        #[snafu(implicit)]
677        location: Location,
678    },
679
680    #[snafu(display("Missing request header"))]
681    MissingRequestHeader {
682        #[snafu(implicit)]
683        location: Location,
684    },
685
686    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
687    RegisterProcedureLoader {
688        type_name: String,
689        #[snafu(implicit)]
690        location: Location,
691        source: common_procedure::error::Error,
692    },
693
694    #[snafu(display(
695        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
696        mailbox_message,
697        reason
698    ))]
699    UnexpectedInstructionReply {
700        mailbox_message: String,
701        reason: String,
702        #[snafu(implicit)]
703        location: Location,
704    },
705
706    #[snafu(display("Expected to retry later, reason: {}", reason))]
707    RetryLater {
708        reason: String,
709        #[snafu(implicit)]
710        location: Location,
711    },
712
713    #[snafu(display("Expected to retry later, reason: {}", reason))]
714    RetryLaterWithSource {
715        reason: String,
716        #[snafu(implicit)]
717        location: Location,
718        source: BoxedError,
719    },
720
721    #[snafu(display("Failed to convert proto data"))]
722    ConvertProtoData {
723        #[snafu(implicit)]
724        location: Location,
725        source: common_meta::error::Error,
726    },
727
728    // this error is used for custom error mapping
729    // please do not delete it
730    #[snafu(display("Other error"))]
731    Other {
732        source: BoxedError,
733        #[snafu(implicit)]
734        location: Location,
735    },
736
737    #[snafu(display("Table metadata manager error"))]
738    TableMetadataManager {
739        source: common_meta::error::Error,
740        #[snafu(implicit)]
741        location: Location,
742    },
743
744    #[snafu(display("Runtime switch manager error"))]
745    RuntimeSwitchManager {
746        source: common_meta::error::Error,
747        #[snafu(implicit)]
748        location: Location,
749    },
750
751    #[snafu(display("Keyvalue backend error"))]
752    KvBackend {
753        source: common_meta::error::Error,
754        #[snafu(implicit)]
755        location: Location,
756    },
757
758    #[snafu(display("Failed to publish message"))]
759    PublishMessage {
760        #[snafu(source)]
761        error: SendError<Message>,
762        #[snafu(implicit)]
763        location: Location,
764    },
765
766    #[snafu(display("Too many partitions"))]
767    TooManyPartitions {
768        #[snafu(implicit)]
769        location: Location,
770    },
771
772    #[snafu(display("Failed to create repartition subtasks"))]
773    RepartitionCreateSubtasks {
774        source: partition::error::Error,
775        #[snafu(implicit)]
776        location: Location,
777    },
778
779    #[snafu(display(
780        "Source partition expression '{}' does not match any existing region",
781        expr
782    ))]
783    RepartitionSourceExprMismatch {
784        expr: String,
785        #[snafu(implicit)]
786        location: Location,
787    },
788
789    #[snafu(display(
790        "Failed to get the state receiver for repartition subprocedure {}",
791        procedure_id
792    ))]
793    RepartitionSubprocedureStateReceiver {
794        procedure_id: ProcedureId,
795        #[snafu(source)]
796        source: common_procedure::Error,
797        #[snafu(implicit)]
798        location: Location,
799    },
800
801    #[snafu(display("Unsupported operation {}", operation))]
802    Unsupported {
803        operation: String,
804        #[snafu(implicit)]
805        location: Location,
806    },
807
808    #[snafu(display("Unexpected table route type: {}", err_msg))]
809    UnexpectedLogicalRouteTable {
810        #[snafu(implicit)]
811        location: Location,
812        err_msg: String,
813        source: common_meta::error::Error,
814    },
815
816    #[snafu(display("Failed to save cluster info"))]
817    SaveClusterInfo {
818        #[snafu(implicit)]
819        location: Location,
820        source: common_meta::error::Error,
821    },
822
823    #[snafu(display("Invalid cluster info format"))]
824    InvalidClusterInfoFormat {
825        #[snafu(implicit)]
826        location: Location,
827        source: common_meta::error::Error,
828    },
829
830    #[snafu(display("Invalid datanode stat format"))]
831    InvalidDatanodeStatFormat {
832        #[snafu(implicit)]
833        location: Location,
834        source: common_meta::error::Error,
835    },
836
837    #[snafu(display("Invalid node info format"))]
838    InvalidNodeInfoFormat {
839        #[snafu(implicit)]
840        location: Location,
841        source: common_meta::error::Error,
842    },
843
844    #[snafu(display("Failed to serialize options to TOML"))]
845    TomlFormat {
846        #[snafu(implicit)]
847        location: Location,
848        #[snafu(source(from(common_config::error::Error, Box::new)))]
849        source: Box<common_config::error::Error>,
850    },
851
852    #[cfg(feature = "pg_kvbackend")]
853    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
854    PostgresExecution {
855        #[snafu(source)]
856        error: tokio_postgres::Error,
857        sql: String,
858        #[snafu(implicit)]
859        location: Location,
860    },
861
862    #[cfg(feature = "pg_kvbackend")]
863    #[snafu(display("Failed to get Postgres client"))]
864    GetPostgresClient {
865        #[snafu(implicit)]
866        location: Location,
867        #[snafu(source)]
868        error: deadpool::managed::PoolError<tokio_postgres::Error>,
869    },
870
871    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
872    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
873    SqlExecutionTimeout {
874        #[snafu(implicit)]
875        location: Location,
876        sql: String,
877        duration: std::time::Duration,
878    },
879
880    #[cfg(feature = "pg_kvbackend")]
881    #[snafu(display("Failed to create connection pool for Postgres"))]
882    CreatePostgresPool {
883        #[snafu(source)]
884        error: deadpool_postgres::CreatePoolError,
885        #[snafu(implicit)]
886        location: Location,
887    },
888
889    #[cfg(feature = "pg_kvbackend")]
890    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
891    GetPostgresConnection {
892        reason: String,
893        #[snafu(implicit)]
894        location: Location,
895    },
896
897    #[cfg(feature = "mysql_kvbackend")]
898    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
899    MySqlExecution {
900        #[snafu(source)]
901        error: sqlx::Error,
902        #[snafu(implicit)]
903        location: Location,
904        sql: String,
905    },
906
907    #[cfg(feature = "mysql_kvbackend")]
908    #[snafu(display("Failed to create mysql pool"))]
909    CreateMySqlPool {
910        #[snafu(source)]
911        error: sqlx::Error,
912        #[snafu(implicit)]
913        location: Location,
914    },
915
916    #[cfg(feature = "mysql_kvbackend")]
917    #[snafu(display("Failed to acquire mysql client from pool"))]
918    AcquireMySqlClient {
919        #[snafu(source)]
920        error: sqlx::Error,
921        #[snafu(implicit)]
922        location: Location,
923    },
924
925    #[snafu(display("Handler not found: {}", name))]
926    HandlerNotFound {
927        name: String,
928        #[snafu(implicit)]
929        location: Location,
930    },
931
932    #[snafu(display("Flow state handler error"))]
933    FlowStateHandler {
934        #[snafu(implicit)]
935        location: Location,
936        source: common_meta::error::Error,
937    },
938
939    #[snafu(display("Failed to build wal options allocator"))]
940    BuildWalOptionsAllocator {
941        #[snafu(implicit)]
942        location: Location,
943        source: common_meta::error::Error,
944    },
945
946    #[snafu(display("Failed to parse wal options"))]
947    ParseWalOptions {
948        #[snafu(implicit)]
949        location: Location,
950        source: common_meta::error::Error,
951    },
952
953    #[snafu(display("Failed to build kafka client."))]
954    BuildKafkaClient {
955        #[snafu(implicit)]
956        location: Location,
957        #[snafu(source)]
958        error: common_meta::error::Error,
959    },
960
961    #[snafu(display(
962        "Failed to build a Kafka partition client, topic: {}, partition: {}",
963        topic,
964        partition
965    ))]
966    BuildPartitionClient {
967        topic: String,
968        partition: i32,
969        #[snafu(implicit)]
970        location: Location,
971        #[snafu(source)]
972        error: rskafka::client::error::Error,
973    },
974
975    #[snafu(display(
976        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
977        topic,
978        partition,
979        offset
980    ))]
981    DeleteRecords {
982        #[snafu(implicit)]
983        location: Location,
984        #[snafu(source)]
985        error: rskafka::client::error::Error,
986        topic: String,
987        partition: i32,
988        offset: u64,
989    },
990
991    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
992    GetOffset {
993        topic: String,
994        #[snafu(implicit)]
995        location: Location,
996        #[snafu(source)]
997        error: rskafka::client::error::Error,
998    },
999
1000    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1001    UpdateTopicNameValue {
1002        topic: String,
1003        #[snafu(implicit)]
1004        location: Location,
1005        #[snafu(source)]
1006        source: common_meta::error::Error,
1007    },
1008
1009    #[snafu(display("Failed to build tls options"))]
1010    BuildTlsOptions {
1011        #[snafu(implicit)]
1012        location: Location,
1013        #[snafu(source)]
1014        source: common_meta::error::Error,
1015    },
1016
1017    #[snafu(display(
1018        "Repartition group {} source region missing, region id: {}",
1019        group_id,
1020        region_id
1021    ))]
1022    RepartitionSourceRegionMissing {
1023        group_id: Uuid,
1024        region_id: RegionId,
1025        #[snafu(implicit)]
1026        location: Location,
1027    },
1028
1029    #[snafu(display(
1030        "Repartition group {} target region missing, region id: {}",
1031        group_id,
1032        region_id
1033    ))]
1034    RepartitionTargetRegionMissing {
1035        group_id: Uuid,
1036        region_id: RegionId,
1037        #[snafu(implicit)]
1038        location: Location,
1039    },
1040
1041    #[snafu(display("Failed to serialize partition expression: {}", source))]
1042    SerializePartitionExpr {
1043        #[snafu(source)]
1044        source: partition::error::Error,
1045        #[snafu(implicit)]
1046        location: Location,
1047    },
1048
1049    #[snafu(display(
1050        "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1051        region_id,
1052        expected,
1053        actual
1054    ))]
1055    PartitionExprMismatch {
1056        region_id: RegionId,
1057        expected: String,
1058        actual: String,
1059        #[snafu(implicit)]
1060        location: Location,
1061    },
1062
1063    #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1064    DeallocateRegions {
1065        #[snafu(implicit)]
1066        location: Location,
1067        table_id: TableId,
1068        #[snafu(source)]
1069        source: common_meta::error::Error,
1070    },
1071}
1072
1073impl Error {
1074    /// Returns `true` if the error is retryable.
1075    pub fn is_retryable(&self) -> bool {
1076        matches!(
1077            self,
1078            Error::RetryLater { .. }
1079                | Error::RetryLaterWithSource { .. }
1080                | Error::MailboxTimeout { .. }
1081        )
1082    }
1083}
1084
1085pub type Result<T> = std::result::Result<T, Error>;
1086
1087define_into_tonic_status!(Error);
1088
1089impl ErrorExt for Error {
1090    fn status_code(&self) -> StatusCode {
1091        match self {
1092            Error::EtcdFailed { .. }
1093            | Error::ConnectEtcd { .. }
1094            | Error::FileIo { .. }
1095            | Error::TcpBind { .. }
1096            | Error::SerializeToJson { .. }
1097            | Error::DeserializeFromJson { .. }
1098            | Error::NoLeader { .. }
1099            | Error::LeaderLeaseExpired { .. }
1100            | Error::LeaderLeaseChanged { .. }
1101            | Error::CreateChannel { .. }
1102            | Error::BatchGet { .. }
1103            | Error::Range { .. }
1104            | Error::ResponseHeaderNotFound { .. }
1105            | Error::InvalidHttpBody { .. }
1106            | Error::ExceededRetryLimit { .. }
1107            | Error::SendShutdownSignal { .. }
1108            | Error::PushMessage { .. }
1109            | Error::MailboxClosed { .. }
1110            | Error::MailboxReceiver { .. }
1111            | Error::StartGrpc { .. }
1112            | Error::PublishMessage { .. }
1113            | Error::Join { .. }
1114            | Error::ChooseItems { .. }
1115            | Error::FlowStateHandler { .. }
1116            | Error::BuildWalOptionsAllocator { .. }
1117            | Error::BuildPartitionClient { .. }
1118            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1119
1120            Error::DeleteRecords { .. }
1121            | Error::GetOffset { .. }
1122            | Error::PeerUnavailable { .. }
1123            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1124            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1125            Error::PruneTaskAlreadyRunning { .. }
1126            | Error::RetryLater { .. }
1127            | Error::MailboxChannelClosed { .. }
1128            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1129            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1130            Error::SerializePartitionExpr { source, .. } => source.status_code(),
1131
1132            Error::Unsupported { .. } => StatusCode::Unsupported,
1133
1134            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1135
1136            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1137            Error::EmptyKey { .. }
1138            | Error::MissingRequiredParameter { .. }
1139            | Error::MissingRequestHeader { .. }
1140            | Error::InvalidLeaseKey { .. }
1141            | Error::InvalidStatKey { .. }
1142            | Error::InvalidInactiveRegionKey { .. }
1143            | Error::ParseNum { .. }
1144            | Error::ParseBool { .. }
1145            | Error::ParseAddr { .. }
1146            | Error::UnsupportedSelectorType { .. }
1147            | Error::InvalidArguments { .. }
1148            | Error::ProcedureNotFound { .. }
1149            | Error::TooManyPartitions { .. }
1150            | Error::TomlFormat { .. }
1151            | Error::HandlerNotFound { .. }
1152            | Error::LeaderPeerChanged { .. }
1153            | Error::RepartitionSourceRegionMissing { .. }
1154            | Error::RepartitionTargetRegionMissing { .. }
1155            | Error::PartitionExprMismatch { .. }
1156            | Error::RepartitionSourceExprMismatch { .. } => StatusCode::InvalidArguments,
1157            Error::LeaseKeyFromUtf8 { .. }
1158            | Error::LeaseValueFromUtf8 { .. }
1159            | Error::InvalidRegionKeyFromUtf8 { .. }
1160            | Error::TableRouteNotFound { .. }
1161            | Error::TableInfoNotFound { .. }
1162            | Error::DatanodeTableNotFound { .. }
1163            | Error::InvalidUtf8Value { .. }
1164            | Error::UnexpectedInstructionReply { .. }
1165            | Error::Unexpected { .. }
1166            | Error::RegionOperatingRace { .. }
1167            | Error::RegionRouteNotFound { .. }
1168            | Error::MigrationAbort { .. }
1169            | Error::MigrationRunning { .. }
1170            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1171            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1172            Error::SaveClusterInfo { source, .. }
1173            | Error::InvalidClusterInfoFormat { source, .. }
1174            | Error::InvalidDatanodeStatFormat { source, .. }
1175            | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1176            Error::InvalidateTableCache { source, .. } => source.status_code(),
1177            Error::SubmitProcedure { source, .. }
1178            | Error::WaitProcedure { source, .. }
1179            | Error::QueryProcedure { source, .. } => source.status_code(),
1180            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1181                source.status_code()
1182            }
1183            Error::StartProcedureManager { source, .. }
1184            | Error::StopProcedureManager { source, .. } => source.status_code(),
1185
1186            Error::ListCatalogs { source, .. }
1187            | Error::ListSchemas { source, .. }
1188            | Error::ListTables { source, .. } => source.status_code(),
1189            Error::StartTelemetryTask { source, .. } => source.status_code(),
1190
1191            Error::NextSequence { source, .. }
1192            | Error::SetNextSequence { source, .. }
1193            | Error::PeekSequence { source, .. } => source.status_code(),
1194            Error::DowngradeLeader { source, .. } => source.status_code(),
1195            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1196            Error::SubmitDdlTask { source, .. }
1197            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1198            Error::ConvertProtoData { source, .. }
1199            | Error::TableMetadataManager { source, .. }
1200            | Error::RuntimeSwitchManager { source, .. }
1201            | Error::KvBackend { source, .. }
1202            | Error::UnexpectedLogicalRouteTable { source, .. }
1203            | Error::UpdateTopicNameValue { source, .. }
1204            | Error::ParseWalOptions { source, .. } => source.status_code(),
1205            Error::ListActiveFrontends { source, .. }
1206            | Error::ListActiveDatanodes { source, .. }
1207            | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1208            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1209
1210            Error::InitMetadata { source, .. }
1211            | Error::InitDdlManager { source, .. }
1212            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1213
1214            Error::BuildTlsOptions { source, .. } => source.status_code(),
1215            Error::Other { source, .. } => source.status_code(),
1216            Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1217            Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1218            Error::DeallocateRegions { source, .. } => source.status_code(),
1219            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1220
1221            #[cfg(feature = "pg_kvbackend")]
1222            Error::CreatePostgresPool { .. }
1223            | Error::GetPostgresClient { .. }
1224            | Error::GetPostgresConnection { .. }
1225            | Error::PostgresExecution { .. } => StatusCode::Internal,
1226            #[cfg(feature = "mysql_kvbackend")]
1227            Error::MySqlExecution { .. }
1228            | Error::CreateMySqlPool { .. }
1229            | Error::ParseMySqlUrl { .. }
1230            | Error::DecodeSqlValue { .. }
1231            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1232            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1233            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1234        }
1235    }
1236
1237    fn as_any(&self) -> &dyn std::any::Any {
1238        self
1239    }
1240}
1241
1242// for form tonic
1243pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1244    let mut err: &(dyn std::error::Error + 'static) = err_status;
1245
1246    loop {
1247        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1248            return Some(io_err);
1249        }
1250
1251        // h2::Error do not expose std::io::Error with `source()`
1252        // https://github.com/hyperium/h2/pull/462
1253        if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1254            && let Some(io_err) = h2_err.get_io()
1255        {
1256            return Some(io_err);
1257        }
1258
1259        err = err.source()?;
1260    }
1261}