meta_srv/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::define_into_tonic_status;
16use common_error::ext::{BoxedError, ErrorExt};
17use common_error::status_code::StatusCode;
18use common_macro::stack_trace_debug;
19use common_meta::DatanodeId;
20use common_procedure::ProcedureId;
21use common_runtime::JoinError;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::metadata::TableId;
25use tokio::sync::mpsc::error::SendError;
26use tonic::codegen::http;
27use uuid::Uuid;
28
29use crate::metasrv::SelectTarget;
30use crate::pubsub::Message;
31use crate::service::mailbox::Channel;
32
33#[derive(Snafu)]
34#[snafu(visibility(pub))]
35#[stack_trace_debug]
36pub enum Error {
37    #[snafu(display("Failed to choose items"))]
38    ChooseItems {
39        #[snafu(implicit)]
40        location: Location,
41        #[snafu(source)]
42        error: rand::distr::weighted::Error,
43    },
44
45    #[snafu(display("Exceeded deadline, operation: {}", operation))]
46    ExceededDeadline {
47        #[snafu(implicit)]
48        location: Location,
49        operation: String,
50    },
51
52    #[snafu(display("The target peer is unavailable temporally: {}", peer_id))]
53    PeerUnavailable {
54        #[snafu(implicit)]
55        location: Location,
56        peer_id: u64,
57    },
58
59    #[snafu(display("Failed to list active frontends"))]
60    ListActiveFrontends {
61        #[snafu(implicit)]
62        location: Location,
63        source: common_meta::error::Error,
64    },
65
66    #[snafu(display("Failed to list active datanodes"))]
67    ListActiveDatanodes {
68        #[snafu(implicit)]
69        location: Location,
70        source: common_meta::error::Error,
71    },
72
73    #[snafu(display("Failed to list active flownodes"))]
74    ListActiveFlownodes {
75        #[snafu(implicit)]
76        location: Location,
77        source: common_meta::error::Error,
78    },
79
80    #[snafu(display("No available frontend"))]
81    NoAvailableFrontend {
82        #[snafu(implicit)]
83        location: Location,
84    },
85
86    #[snafu(display("Another migration procedure is running for region: {}", region_id))]
87    MigrationRunning {
88        #[snafu(implicit)]
89        location: Location,
90        region_id: RegionId,
91    },
92
93    #[snafu(display(
94        "The region migration procedure is completed for region: {}, target_peer: {}",
95        region_id,
96        target_peer_id
97    ))]
98    RegionMigrated {
99        #[snafu(implicit)]
100        location: Location,
101        region_id: RegionId,
102        target_peer_id: u64,
103    },
104
105    #[snafu(display("The region migration procedure aborted, reason: {}", reason))]
106    MigrationAbort {
107        #[snafu(implicit)]
108        location: Location,
109        reason: String,
110    },
111
112    #[snafu(display(
113        "Another procedure is operating the region: {} on peer: {}",
114        region_id,
115        peer_id
116    ))]
117    RegionOperatingRace {
118        #[snafu(implicit)]
119        location: Location,
120        peer_id: DatanodeId,
121        region_id: RegionId,
122    },
123
124    #[snafu(display("Failed to init ddl manager"))]
125    InitDdlManager {
126        #[snafu(implicit)]
127        location: Location,
128        source: common_meta::error::Error,
129    },
130
131    #[snafu(display("Failed to init reconciliation manager"))]
132    InitReconciliationManager {
133        #[snafu(implicit)]
134        location: Location,
135        source: common_meta::error::Error,
136    },
137
138    #[snafu(display("Failed to create default catalog and schema"))]
139    InitMetadata {
140        #[snafu(implicit)]
141        location: Location,
142        source: common_meta::error::Error,
143    },
144
145    #[snafu(display("Failed to allocate next sequence number"))]
146    NextSequence {
147        #[snafu(implicit)]
148        location: Location,
149        source: common_meta::error::Error,
150    },
151
152    #[snafu(display("Failed to set next sequence number"))]
153    SetNextSequence {
154        #[snafu(implicit)]
155        location: Location,
156        source: common_meta::error::Error,
157    },
158
159    #[snafu(display("Failed to peek sequence number"))]
160    PeekSequence {
161        #[snafu(implicit)]
162        location: Location,
163        source: common_meta::error::Error,
164    },
165
166    #[snafu(display("Failed to start telemetry task"))]
167    StartTelemetryTask {
168        #[snafu(implicit)]
169        location: Location,
170        source: common_runtime::error::Error,
171    },
172
173    #[snafu(display("Failed to submit ddl task"))]
174    SubmitDdlTask {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_meta::error::Error,
178    },
179
180    #[snafu(display("Failed to submit reconcile procedure"))]
181    SubmitReconcileProcedure {
182        #[snafu(implicit)]
183        location: Location,
184        source: common_meta::error::Error,
185    },
186
187    #[snafu(display("Failed to invalidate table cache"))]
188    InvalidateTableCache {
189        #[snafu(implicit)]
190        location: Location,
191        source: common_meta::error::Error,
192    },
193
194    #[snafu(display("Failed to list catalogs"))]
195    ListCatalogs {
196        #[snafu(implicit)]
197        location: Location,
198        source: BoxedError,
199    },
200
201    #[snafu(display("Failed to list {}'s schemas", catalog))]
202    ListSchemas {
203        #[snafu(implicit)]
204        location: Location,
205        catalog: String,
206        source: BoxedError,
207    },
208
209    #[snafu(display("Failed to list {}.{}'s tables", catalog, schema))]
210    ListTables {
211        #[snafu(implicit)]
212        location: Location,
213        catalog: String,
214        schema: String,
215        source: BoxedError,
216    },
217
218    #[snafu(display("Failed to join a future"))]
219    Join {
220        #[snafu(implicit)]
221        location: Location,
222        #[snafu(source)]
223        error: JoinError,
224    },
225
226    #[snafu(display(
227        "Failed to request {}, required: {}, but only {} available",
228        select_target,
229        required,
230        available
231    ))]
232    NoEnoughAvailableNode {
233        #[snafu(implicit)]
234        location: Location,
235        required: usize,
236        available: usize,
237        select_target: SelectTarget,
238    },
239
240    #[snafu(display("Failed to send shutdown signal"))]
241    SendShutdownSignal {
242        #[snafu(source)]
243        error: SendError<()>,
244    },
245
246    #[snafu(display("Failed to shutdown {} server", server))]
247    ShutdownServer {
248        #[snafu(implicit)]
249        location: Location,
250        source: servers::error::Error,
251        server: String,
252    },
253
254    #[snafu(display("Empty key is not allowed"))]
255    EmptyKey {
256        #[snafu(implicit)]
257        location: Location,
258    },
259
260    #[snafu(display("Failed to execute via Etcd"))]
261    EtcdFailed {
262        #[snafu(source)]
263        error: etcd_client::Error,
264        #[snafu(implicit)]
265        location: Location,
266    },
267
268    #[snafu(display("Failed to connect to Etcd"))]
269    ConnectEtcd {
270        #[snafu(source)]
271        error: etcd_client::Error,
272        #[snafu(implicit)]
273        location: Location,
274    },
275
276    #[snafu(display("Failed to read file: {}", path))]
277    FileIo {
278        #[snafu(source)]
279        error: std::io::Error,
280        #[snafu(implicit)]
281        location: Location,
282        path: String,
283    },
284
285    #[snafu(display("Failed to bind address {}", addr))]
286    TcpBind {
287        addr: String,
288        #[snafu(source)]
289        error: std::io::Error,
290        #[snafu(implicit)]
291        location: Location,
292    },
293
294    #[snafu(display("Failed to start gRPC server"))]
295    StartGrpc {
296        #[snafu(source)]
297        error: tonic::transport::Error,
298        #[snafu(implicit)]
299        location: Location,
300    },
301
302    #[snafu(display("Failed to start http server"))]
303    StartHttp {
304        #[snafu(implicit)]
305        location: Location,
306        source: servers::error::Error,
307    },
308
309    #[snafu(display("Failed to parse address {}", addr))]
310    ParseAddr {
311        addr: String,
312        #[snafu(source)]
313        error: std::net::AddrParseError,
314    },
315
316    #[snafu(display("Invalid lease key: {}", key))]
317    InvalidLeaseKey {
318        key: String,
319        #[snafu(implicit)]
320        location: Location,
321    },
322
323    #[snafu(display("Invalid datanode stat key: {}", key))]
324    InvalidStatKey {
325        key: String,
326        #[snafu(implicit)]
327        location: Location,
328    },
329
330    #[snafu(display("Invalid inactive region key: {}", key))]
331    InvalidInactiveRegionKey {
332        key: String,
333        #[snafu(implicit)]
334        location: Location,
335    },
336
337    #[snafu(display("Failed to parse lease key from utf8"))]
338    LeaseKeyFromUtf8 {
339        #[snafu(source)]
340        error: std::string::FromUtf8Error,
341        #[snafu(implicit)]
342        location: Location,
343    },
344
345    #[snafu(display("Failed to parse lease value from utf8"))]
346    LeaseValueFromUtf8 {
347        #[snafu(source)]
348        error: std::string::FromUtf8Error,
349        #[snafu(implicit)]
350        location: Location,
351    },
352
353    #[snafu(display("Failed to parse invalid region key from utf8"))]
354    InvalidRegionKeyFromUtf8 {
355        #[snafu(source)]
356        error: std::string::FromUtf8Error,
357        #[snafu(implicit)]
358        location: Location,
359    },
360
361    #[snafu(display("Failed to serialize to json: {}", input))]
362    SerializeToJson {
363        input: String,
364        #[snafu(source)]
365        error: serde_json::error::Error,
366        #[snafu(implicit)]
367        location: Location,
368    },
369
370    #[snafu(display("Failed to deserialize from json: {}", input))]
371    DeserializeFromJson {
372        input: String,
373        #[snafu(source)]
374        error: serde_json::error::Error,
375        #[snafu(implicit)]
376        location: Location,
377    },
378
379    #[snafu(display("Failed to serialize config"))]
380    SerializeConfig {
381        #[snafu(source)]
382        error: serde_json::error::Error,
383        #[snafu(implicit)]
384        location: Location,
385    },
386
387    #[snafu(display("Failed to parse number: {}", err_msg))]
388    ParseNum {
389        err_msg: String,
390        #[snafu(source)]
391        error: std::num::ParseIntError,
392        #[snafu(implicit)]
393        location: Location,
394    },
395
396    #[snafu(display("Failed to parse bool: {}", err_msg))]
397    ParseBool {
398        err_msg: String,
399        #[snafu(source)]
400        error: std::str::ParseBoolError,
401        #[snafu(implicit)]
402        location: Location,
403    },
404
405    #[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
406    DowngradeLeader {
407        region_id: RegionId,
408        #[snafu(implicit)]
409        location: Location,
410        #[snafu(source)]
411        source: BoxedError,
412    },
413
414    #[snafu(display("Region's leader peer changed: {}", msg))]
415    LeaderPeerChanged {
416        msg: String,
417        #[snafu(implicit)]
418        location: Location,
419    },
420
421    #[snafu(display("Invalid arguments: {}", err_msg))]
422    InvalidArguments {
423        err_msg: String,
424        #[snafu(implicit)]
425        location: Location,
426    },
427
428    #[cfg(feature = "mysql_kvbackend")]
429    #[snafu(display("Failed to parse mysql url: {}", mysql_url))]
430    ParseMySqlUrl {
431        #[snafu(source)]
432        error: sqlx::error::Error,
433        mysql_url: String,
434        #[snafu(implicit)]
435        location: Location,
436    },
437
438    #[cfg(feature = "mysql_kvbackend")]
439    #[snafu(display("Failed to decode sql value"))]
440    DecodeSqlValue {
441        #[snafu(source)]
442        error: sqlx::error::Error,
443        #[snafu(implicit)]
444        location: Location,
445    },
446
447    #[snafu(display("Failed to find table route for {table_id}"))]
448    TableRouteNotFound {
449        table_id: TableId,
450        #[snafu(implicit)]
451        location: Location,
452    },
453
454    #[snafu(display("Failed to find table route for {region_id}"))]
455    RegionRouteNotFound {
456        region_id: RegionId,
457        #[snafu(implicit)]
458        location: Location,
459    },
460
461    #[snafu(display("Table info not found: {}", table_id))]
462    TableInfoNotFound {
463        table_id: TableId,
464        #[snafu(implicit)]
465        location: Location,
466    },
467
468    #[snafu(display("Datanode table not found: {}, datanode: {}", table_id, datanode_id))]
469    DatanodeTableNotFound {
470        table_id: TableId,
471        datanode_id: DatanodeId,
472        #[snafu(implicit)]
473        location: Location,
474    },
475
476    #[snafu(display("Metasrv has no leader at this moment"))]
477    NoLeader {
478        #[snafu(implicit)]
479        location: Location,
480    },
481
482    #[snafu(display("Leader lease expired"))]
483    LeaderLeaseExpired {
484        #[snafu(implicit)]
485        location: Location,
486    },
487
488    #[snafu(display("Leader lease changed during election"))]
489    LeaderLeaseChanged {
490        #[snafu(implicit)]
491        location: Location,
492    },
493
494    #[snafu(display("Table {} not found", name))]
495    TableNotFound {
496        name: String,
497        #[snafu(implicit)]
498        location: Location,
499    },
500
501    #[snafu(display("Unsupported selector type, {}", selector_type))]
502    UnsupportedSelectorType {
503        selector_type: String,
504        #[snafu(implicit)]
505        location: Location,
506    },
507
508    #[snafu(display("Unexpected, violated: {violated}"))]
509    Unexpected {
510        violated: String,
511        #[snafu(implicit)]
512        location: Location,
513    },
514
515    #[snafu(display("Failed to create gRPC channel"))]
516    CreateChannel {
517        #[snafu(implicit)]
518        location: Location,
519        source: common_grpc::error::Error,
520    },
521
522    #[snafu(display("Failed to batch get KVs from leader's in_memory kv store"))]
523    BatchGet {
524        #[snafu(source)]
525        error: tonic::Status,
526        #[snafu(implicit)]
527        location: Location,
528    },
529
530    #[snafu(display("Failed to batch range KVs from leader's in_memory kv store"))]
531    Range {
532        #[snafu(source)]
533        error: tonic::Status,
534        #[snafu(implicit)]
535        location: Location,
536    },
537
538    #[snafu(display("Response header not found"))]
539    ResponseHeaderNotFound {
540        #[snafu(implicit)]
541        location: Location,
542    },
543
544    #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
545    IsNotLeader {
546        node_addr: String,
547        #[snafu(implicit)]
548        location: Location,
549    },
550
551    #[snafu(display("Invalid http body"))]
552    InvalidHttpBody {
553        #[snafu(source)]
554        error: http::Error,
555        #[snafu(implicit)]
556        location: Location,
557    },
558
559    #[snafu(display(
560        "The number of retries for the grpc call {} exceeded the limit, {}",
561        func_name,
562        retry_num
563    ))]
564    ExceededRetryLimit {
565        func_name: String,
566        retry_num: usize,
567        #[snafu(implicit)]
568        location: Location,
569    },
570
571    #[snafu(display("Invalid utf-8 value"))]
572    InvalidUtf8Value {
573        #[snafu(source)]
574        error: std::string::FromUtf8Error,
575        #[snafu(implicit)]
576        location: Location,
577    },
578
579    #[snafu(display("Missing required parameter, param: {:?}", param))]
580    MissingRequiredParameter { param: String },
581
582    #[snafu(display("Failed to start procedure manager"))]
583    StartProcedureManager {
584        #[snafu(implicit)]
585        location: Location,
586        source: common_procedure::Error,
587    },
588
589    #[snafu(display("Failed to stop procedure manager"))]
590    StopProcedureManager {
591        #[snafu(implicit)]
592        location: Location,
593        source: common_procedure::Error,
594    },
595
596    #[snafu(display("Failed to wait procedure done"))]
597    WaitProcedure {
598        #[snafu(implicit)]
599        location: Location,
600        source: common_procedure::Error,
601    },
602
603    #[snafu(display("Failed to query procedure state"))]
604    QueryProcedure {
605        #[snafu(implicit)]
606        location: Location,
607        source: common_procedure::Error,
608    },
609
610    #[snafu(display("Procedure not found: {pid}"))]
611    ProcedureNotFound {
612        #[snafu(implicit)]
613        location: Location,
614        pid: String,
615    },
616
617    #[snafu(display("Failed to submit procedure"))]
618    SubmitProcedure {
619        #[snafu(implicit)]
620        location: Location,
621        source: common_procedure::Error,
622    },
623
624    #[snafu(display("A prune task for topic {} is already running", topic))]
625    PruneTaskAlreadyRunning {
626        topic: String,
627        #[snafu(implicit)]
628        location: Location,
629    },
630
631    #[snafu(display("Schema already exists, name: {schema_name}"))]
632    SchemaAlreadyExists {
633        schema_name: String,
634        #[snafu(implicit)]
635        location: Location,
636    },
637
638    #[snafu(display("Table already exists: {table_name}"))]
639    TableAlreadyExists {
640        table_name: String,
641        #[snafu(implicit)]
642        location: Location,
643    },
644
645    #[snafu(display("Pusher not found: {pusher_id}"))]
646    PusherNotFound {
647        pusher_id: String,
648        #[snafu(implicit)]
649        location: Location,
650    },
651
652    #[snafu(display("Failed to push message: {err_msg}"))]
653    PushMessage {
654        err_msg: String,
655        #[snafu(implicit)]
656        location: Location,
657    },
658
659    #[snafu(display("Mailbox already closed: {id}"))]
660    MailboxClosed {
661        id: u64,
662        #[snafu(implicit)]
663        location: Location,
664    },
665
666    #[snafu(display("Mailbox timeout: {id}"))]
667    MailboxTimeout {
668        id: u64,
669        #[snafu(implicit)]
670        location: Location,
671    },
672
673    #[snafu(display("Mailbox receiver got an error: {id}, {err_msg}"))]
674    MailboxReceiver {
675        id: u64,
676        err_msg: String,
677        #[snafu(implicit)]
678        location: Location,
679    },
680
681    #[snafu(display("Mailbox channel closed: {channel}"))]
682    MailboxChannelClosed {
683        channel: Channel,
684        #[snafu(implicit)]
685        location: Location,
686    },
687
688    #[snafu(display("Missing request header"))]
689    MissingRequestHeader {
690        #[snafu(implicit)]
691        location: Location,
692    },
693
694    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
695    RegisterProcedureLoader {
696        type_name: String,
697        #[snafu(implicit)]
698        location: Location,
699        source: common_procedure::error::Error,
700    },
701
702    #[snafu(display(
703        "Received unexpected instruction reply, mailbox message: {}, reason: {}",
704        mailbox_message,
705        reason
706    ))]
707    UnexpectedInstructionReply {
708        mailbox_message: String,
709        reason: String,
710        #[snafu(implicit)]
711        location: Location,
712    },
713
714    #[snafu(display("Expected to retry later, reason: {}", reason))]
715    RetryLater {
716        reason: String,
717        #[snafu(implicit)]
718        location: Location,
719    },
720
721    #[snafu(display("Expected to retry later, reason: {}", reason))]
722    RetryLaterWithSource {
723        reason: String,
724        #[snafu(implicit)]
725        location: Location,
726        source: BoxedError,
727    },
728
729    #[snafu(display("Failed to convert proto data"))]
730    ConvertProtoData {
731        #[snafu(implicit)]
732        location: Location,
733        source: common_meta::error::Error,
734    },
735
736    // this error is used for custom error mapping
737    // please do not delete it
738    #[snafu(display("Other error"))]
739    Other {
740        source: BoxedError,
741        #[snafu(implicit)]
742        location: Location,
743    },
744
745    #[snafu(display("Table metadata manager error"))]
746    TableMetadataManager {
747        source: common_meta::error::Error,
748        #[snafu(implicit)]
749        location: Location,
750    },
751
752    #[snafu(display("Runtime switch manager error"))]
753    RuntimeSwitchManager {
754        source: common_meta::error::Error,
755        #[snafu(implicit)]
756        location: Location,
757    },
758
759    #[snafu(display("Keyvalue backend error"))]
760    KvBackend {
761        source: common_meta::error::Error,
762        #[snafu(implicit)]
763        location: Location,
764    },
765
766    #[snafu(display("Failed to publish message"))]
767    PublishMessage {
768        #[snafu(source)]
769        error: SendError<Message>,
770        #[snafu(implicit)]
771        location: Location,
772    },
773
774    #[snafu(display("Too many partitions"))]
775    TooManyPartitions {
776        #[snafu(implicit)]
777        location: Location,
778    },
779
780    #[snafu(display("Failed to create repartition subtasks"))]
781    RepartitionCreateSubtasks {
782        source: partition::error::Error,
783        #[snafu(implicit)]
784        location: Location,
785    },
786
787    #[snafu(display(
788        "Source partition expression '{}' does not match any existing region",
789        expr
790    ))]
791    RepartitionSourceExprMismatch {
792        expr: String,
793        #[snafu(implicit)]
794        location: Location,
795    },
796
797    #[snafu(display(
798        "Failed to get the state receiver for repartition subprocedure {}",
799        procedure_id
800    ))]
801    RepartitionSubprocedureStateReceiver {
802        procedure_id: ProcedureId,
803        #[snafu(source)]
804        source: common_procedure::Error,
805        #[snafu(implicit)]
806        location: Location,
807    },
808
809    #[snafu(display("Unsupported operation {}", operation))]
810    Unsupported {
811        operation: String,
812        #[snafu(implicit)]
813        location: Location,
814    },
815
816    #[snafu(display("Unexpected table route type: {}", err_msg))]
817    UnexpectedLogicalRouteTable {
818        #[snafu(implicit)]
819        location: Location,
820        err_msg: String,
821        source: common_meta::error::Error,
822    },
823
824    #[snafu(display("Failed to save cluster info"))]
825    SaveClusterInfo {
826        #[snafu(implicit)]
827        location: Location,
828        source: common_meta::error::Error,
829    },
830
831    #[snafu(display("Invalid cluster info format"))]
832    InvalidClusterInfoFormat {
833        #[snafu(implicit)]
834        location: Location,
835        source: common_meta::error::Error,
836    },
837
838    #[snafu(display("Invalid datanode stat format"))]
839    InvalidDatanodeStatFormat {
840        #[snafu(implicit)]
841        location: Location,
842        source: common_meta::error::Error,
843    },
844
845    #[snafu(display("Invalid node info format"))]
846    InvalidNodeInfoFormat {
847        #[snafu(implicit)]
848        location: Location,
849        source: common_meta::error::Error,
850    },
851
852    #[snafu(display("Failed to serialize options to TOML"))]
853    TomlFormat {
854        #[snafu(implicit)]
855        location: Location,
856        #[snafu(source(from(common_config::error::Error, Box::new)))]
857        source: Box<common_config::error::Error>,
858    },
859
860    #[cfg(feature = "pg_kvbackend")]
861    #[snafu(display("Failed to execute via postgres, sql: {}", sql))]
862    PostgresExecution {
863        #[snafu(source)]
864        error: tokio_postgres::Error,
865        sql: String,
866        #[snafu(implicit)]
867        location: Location,
868    },
869
870    #[cfg(feature = "pg_kvbackend")]
871    #[snafu(display("Failed to get Postgres client"))]
872    GetPostgresClient {
873        #[snafu(implicit)]
874        location: Location,
875        #[snafu(source)]
876        error: deadpool::managed::PoolError<tokio_postgres::Error>,
877    },
878
879    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
880    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
881    SqlExecutionTimeout {
882        #[snafu(implicit)]
883        location: Location,
884        sql: String,
885        duration: std::time::Duration,
886    },
887
888    #[cfg(feature = "pg_kvbackend")]
889    #[snafu(display("Failed to create connection pool for Postgres"))]
890    CreatePostgresPool {
891        #[snafu(source)]
892        error: deadpool_postgres::CreatePoolError,
893        #[snafu(implicit)]
894        location: Location,
895    },
896
897    #[cfg(feature = "pg_kvbackend")]
898    #[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
899    GetPostgresConnection {
900        reason: String,
901        #[snafu(implicit)]
902        location: Location,
903    },
904
905    #[cfg(feature = "mysql_kvbackend")]
906    #[snafu(display("Failed to execute via mysql, sql: {}", sql))]
907    MySqlExecution {
908        #[snafu(source)]
909        error: sqlx::Error,
910        #[snafu(implicit)]
911        location: Location,
912        sql: String,
913    },
914
915    #[cfg(feature = "mysql_kvbackend")]
916    #[snafu(display("Failed to create mysql pool"))]
917    CreateMySqlPool {
918        #[snafu(source)]
919        error: sqlx::Error,
920        #[snafu(implicit)]
921        location: Location,
922    },
923
924    #[cfg(feature = "mysql_kvbackend")]
925    #[snafu(display("Failed to acquire mysql client from pool"))]
926    AcquireMySqlClient {
927        #[snafu(source)]
928        error: sqlx::Error,
929        #[snafu(implicit)]
930        location: Location,
931    },
932
933    #[snafu(display("Handler not found: {}", name))]
934    HandlerNotFound {
935        name: String,
936        #[snafu(implicit)]
937        location: Location,
938    },
939
940    #[snafu(display("Flow state handler error"))]
941    FlowStateHandler {
942        #[snafu(implicit)]
943        location: Location,
944        source: common_meta::error::Error,
945    },
946
947    #[snafu(display("Failed to build wal provider"))]
948    BuildWalProvider {
949        #[snafu(implicit)]
950        location: Location,
951        source: common_meta::error::Error,
952    },
953
954    #[snafu(display("Failed to parse wal options"))]
955    ParseWalOptions {
956        #[snafu(implicit)]
957        location: Location,
958        source: common_meta::error::Error,
959    },
960
961    #[snafu(display("Failed to build kafka client."))]
962    BuildKafkaClient {
963        #[snafu(implicit)]
964        location: Location,
965        #[snafu(source)]
966        error: common_meta::error::Error,
967    },
968
969    #[snafu(display(
970        "Failed to build a Kafka partition client, topic: {}, partition: {}",
971        topic,
972        partition
973    ))]
974    BuildPartitionClient {
975        topic: String,
976        partition: i32,
977        #[snafu(implicit)]
978        location: Location,
979        #[snafu(source)]
980        error: rskafka::client::error::Error,
981    },
982
983    #[snafu(display(
984        "Failed to delete records from Kafka, topic: {}, partition: {}, offset: {}",
985        topic,
986        partition,
987        offset
988    ))]
989    DeleteRecords {
990        #[snafu(implicit)]
991        location: Location,
992        #[snafu(source)]
993        error: rskafka::client::error::Error,
994        topic: String,
995        partition: i32,
996        offset: u64,
997    },
998
999    #[snafu(display("Failed to get offset from Kafka, topic: {}", topic))]
1000    GetOffset {
1001        topic: String,
1002        #[snafu(implicit)]
1003        location: Location,
1004        #[snafu(source)]
1005        error: rskafka::client::error::Error,
1006    },
1007
1008    #[snafu(display("Failed to update the TopicNameValue in kvbackend, topic: {}", topic))]
1009    UpdateTopicNameValue {
1010        topic: String,
1011        #[snafu(implicit)]
1012        location: Location,
1013        #[snafu(source)]
1014        source: common_meta::error::Error,
1015    },
1016
1017    #[snafu(display("Failed to build tls options"))]
1018    BuildTlsOptions {
1019        #[snafu(implicit)]
1020        location: Location,
1021        #[snafu(source)]
1022        source: common_meta::error::Error,
1023    },
1024
1025    #[snafu(display(
1026        "Repartition group {} source region missing, region id: {}",
1027        group_id,
1028        region_id
1029    ))]
1030    RepartitionSourceRegionMissing {
1031        group_id: Uuid,
1032        region_id: RegionId,
1033        #[snafu(implicit)]
1034        location: Location,
1035    },
1036
1037    #[snafu(display(
1038        "Repartition group {} target region missing, region id: {}",
1039        group_id,
1040        region_id
1041    ))]
1042    RepartitionTargetRegionMissing {
1043        group_id: Uuid,
1044        region_id: RegionId,
1045        #[snafu(implicit)]
1046        location: Location,
1047    },
1048
1049    #[snafu(display("Failed to serialize partition expression"))]
1050    SerializePartitionExpr {
1051        #[snafu(source)]
1052        source: partition::error::Error,
1053        #[snafu(implicit)]
1054        location: Location,
1055    },
1056
1057    #[snafu(display("Failed to deserialize partition expression"))]
1058    DeserializePartitionExpr {
1059        #[snafu(source)]
1060        source: partition::error::Error,
1061        #[snafu(implicit)]
1062        location: Location,
1063    },
1064
1065    #[snafu(display("Empty partition expression"))]
1066    EmptyPartitionExpr {
1067        #[snafu(implicit)]
1068        location: Location,
1069    },
1070
1071    #[snafu(display(
1072        "Partition expression mismatch, region id: {}, expected: {}, actual: {}",
1073        region_id,
1074        expected,
1075        actual
1076    ))]
1077    PartitionExprMismatch {
1078        region_id: RegionId,
1079        expected: String,
1080        actual: String,
1081        #[snafu(implicit)]
1082        location: Location,
1083    },
1084
1085    #[snafu(display("Failed to allocate regions for table: {}", table_id))]
1086    AllocateRegions {
1087        #[snafu(implicit)]
1088        location: Location,
1089        table_id: TableId,
1090        #[snafu(source)]
1091        source: common_meta::error::Error,
1092    },
1093
1094    #[snafu(display("Failed to deallocate regions for table: {}", table_id))]
1095    DeallocateRegions {
1096        #[snafu(implicit)]
1097        location: Location,
1098        table_id: TableId,
1099        #[snafu(source)]
1100        source: common_meta::error::Error,
1101    },
1102
1103    #[snafu(display("Failed to build create request for table: {}", table_id))]
1104    BuildCreateRequest {
1105        #[snafu(implicit)]
1106        location: Location,
1107        table_id: TableId,
1108        #[snafu(source)]
1109        source: common_meta::error::Error,
1110    },
1111
1112    #[snafu(display("Failed to allocate region routes for table: {}", table_id))]
1113    AllocateRegionRoutes {
1114        #[snafu(implicit)]
1115        location: Location,
1116        table_id: TableId,
1117        #[snafu(source)]
1118        source: common_meta::error::Error,
1119    },
1120
1121    #[snafu(display("Failed to allocate wal options for table: {}", table_id))]
1122    AllocateWalOptions {
1123        #[snafu(implicit)]
1124        location: Location,
1125        table_id: TableId,
1126        #[snafu(source)]
1127        source: common_meta::error::Error,
1128    },
1129}
1130
1131impl Error {
1132    /// Returns `true` if the error is retryable.
1133    pub fn is_retryable(&self) -> bool {
1134        matches!(
1135            self,
1136            Error::RetryLater { .. }
1137                | Error::RetryLaterWithSource { .. }
1138                | Error::MailboxTimeout { .. }
1139        )
1140    }
1141}
1142
1143pub type Result<T> = std::result::Result<T, Error>;
1144
1145define_into_tonic_status!(Error);
1146
1147impl ErrorExt for Error {
1148    fn status_code(&self) -> StatusCode {
1149        match self {
1150            Error::EtcdFailed { .. }
1151            | Error::ConnectEtcd { .. }
1152            | Error::FileIo { .. }
1153            | Error::TcpBind { .. }
1154            | Error::SerializeConfig { .. }
1155            | Error::SerializeToJson { .. }
1156            | Error::DeserializeFromJson { .. }
1157            | Error::NoLeader { .. }
1158            | Error::LeaderLeaseExpired { .. }
1159            | Error::LeaderLeaseChanged { .. }
1160            | Error::CreateChannel { .. }
1161            | Error::BatchGet { .. }
1162            | Error::Range { .. }
1163            | Error::ResponseHeaderNotFound { .. }
1164            | Error::InvalidHttpBody { .. }
1165            | Error::ExceededRetryLimit { .. }
1166            | Error::SendShutdownSignal { .. }
1167            | Error::PushMessage { .. }
1168            | Error::MailboxClosed { .. }
1169            | Error::MailboxReceiver { .. }
1170            | Error::StartGrpc { .. }
1171            | Error::PublishMessage { .. }
1172            | Error::Join { .. }
1173            | Error::ChooseItems { .. }
1174            | Error::FlowStateHandler { .. }
1175            | Error::BuildWalProvider { .. }
1176            | Error::BuildPartitionClient { .. }
1177            | Error::BuildKafkaClient { .. } => StatusCode::Internal,
1178
1179            Error::DeleteRecords { .. }
1180            | Error::GetOffset { .. }
1181            | Error::PeerUnavailable { .. }
1182            | Error::PusherNotFound { .. } => StatusCode::Unexpected,
1183            Error::MailboxTimeout { .. } | Error::ExceededDeadline { .. } => StatusCode::Cancelled,
1184            Error::PruneTaskAlreadyRunning { .. }
1185            | Error::RetryLater { .. }
1186            | Error::MailboxChannelClosed { .. }
1187            | Error::IsNotLeader { .. } => StatusCode::IllegalState,
1188            Error::RetryLaterWithSource { source, .. } => source.status_code(),
1189            Error::SerializePartitionExpr { source, .. }
1190            | Error::DeserializePartitionExpr { source, .. } => source.status_code(),
1191
1192            Error::Unsupported { .. } => StatusCode::Unsupported,
1193
1194            Error::SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1195
1196            Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1197            Error::EmptyKey { .. }
1198            | Error::MissingRequiredParameter { .. }
1199            | Error::MissingRequestHeader { .. }
1200            | Error::InvalidLeaseKey { .. }
1201            | Error::InvalidStatKey { .. }
1202            | Error::InvalidInactiveRegionKey { .. }
1203            | Error::ParseNum { .. }
1204            | Error::ParseBool { .. }
1205            | Error::ParseAddr { .. }
1206            | Error::UnsupportedSelectorType { .. }
1207            | Error::InvalidArguments { .. }
1208            | Error::ProcedureNotFound { .. }
1209            | Error::TooManyPartitions { .. }
1210            | Error::TomlFormat { .. }
1211            | Error::HandlerNotFound { .. }
1212            | Error::LeaderPeerChanged { .. }
1213            | Error::RepartitionSourceRegionMissing { .. }
1214            | Error::RepartitionTargetRegionMissing { .. }
1215            | Error::PartitionExprMismatch { .. }
1216            | Error::RepartitionSourceExprMismatch { .. }
1217            | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
1218            Error::LeaseKeyFromUtf8 { .. }
1219            | Error::LeaseValueFromUtf8 { .. }
1220            | Error::InvalidRegionKeyFromUtf8 { .. }
1221            | Error::TableRouteNotFound { .. }
1222            | Error::TableInfoNotFound { .. }
1223            | Error::DatanodeTableNotFound { .. }
1224            | Error::InvalidUtf8Value { .. }
1225            | Error::UnexpectedInstructionReply { .. }
1226            | Error::Unexpected { .. }
1227            | Error::RegionOperatingRace { .. }
1228            | Error::RegionRouteNotFound { .. }
1229            | Error::MigrationAbort { .. }
1230            | Error::MigrationRunning { .. }
1231            | Error::RegionMigrated { .. } => StatusCode::Unexpected,
1232            Error::TableNotFound { .. } => StatusCode::TableNotFound,
1233            Error::SaveClusterInfo { source, .. }
1234            | Error::InvalidClusterInfoFormat { source, .. }
1235            | Error::InvalidDatanodeStatFormat { source, .. }
1236            | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
1237            Error::InvalidateTableCache { source, .. } => source.status_code(),
1238            Error::SubmitProcedure { source, .. }
1239            | Error::WaitProcedure { source, .. }
1240            | Error::QueryProcedure { source, .. } => source.status_code(),
1241            Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
1242                source.status_code()
1243            }
1244            Error::StartProcedureManager { source, .. }
1245            | Error::StopProcedureManager { source, .. } => source.status_code(),
1246
1247            Error::ListCatalogs { source, .. }
1248            | Error::ListSchemas { source, .. }
1249            | Error::ListTables { source, .. } => source.status_code(),
1250            Error::StartTelemetryTask { source, .. } => source.status_code(),
1251
1252            Error::NextSequence { source, .. }
1253            | Error::SetNextSequence { source, .. }
1254            | Error::PeekSequence { source, .. } => source.status_code(),
1255            Error::DowngradeLeader { source, .. } => source.status_code(),
1256            Error::RegisterProcedureLoader { source, .. } => source.status_code(),
1257            Error::SubmitDdlTask { source, .. }
1258            | Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
1259            Error::ConvertProtoData { source, .. }
1260            | Error::TableMetadataManager { source, .. }
1261            | Error::RuntimeSwitchManager { source, .. }
1262            | Error::KvBackend { source, .. }
1263            | Error::UnexpectedLogicalRouteTable { source, .. }
1264            | Error::UpdateTopicNameValue { source, .. }
1265            | Error::ParseWalOptions { source, .. } => source.status_code(),
1266            Error::ListActiveFrontends { source, .. }
1267            | Error::ListActiveDatanodes { source, .. }
1268            | Error::ListActiveFlownodes { source, .. } => source.status_code(),
1269            Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
1270
1271            Error::InitMetadata { source, .. }
1272            | Error::InitDdlManager { source, .. }
1273            | Error::InitReconciliationManager { source, .. } => source.status_code(),
1274
1275            Error::BuildTlsOptions { source, .. } => source.status_code(),
1276            Error::Other { source, .. } => source.status_code(),
1277            Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1278            Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
1279            Error::AllocateRegions { source, .. } => source.status_code(),
1280            Error::DeallocateRegions { source, .. } => source.status_code(),
1281            Error::AllocateRegionRoutes { source, .. } => source.status_code(),
1282            Error::AllocateWalOptions { source, .. } => source.status_code(),
1283            Error::BuildCreateRequest { source, .. } => source.status_code(),
1284            Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
1285
1286            #[cfg(feature = "pg_kvbackend")]
1287            Error::CreatePostgresPool { .. }
1288            | Error::GetPostgresClient { .. }
1289            | Error::GetPostgresConnection { .. }
1290            | Error::PostgresExecution { .. } => StatusCode::Internal,
1291            #[cfg(feature = "mysql_kvbackend")]
1292            Error::MySqlExecution { .. }
1293            | Error::CreateMySqlPool { .. }
1294            | Error::ParseMySqlUrl { .. }
1295            | Error::DecodeSqlValue { .. }
1296            | Error::AcquireMySqlClient { .. } => StatusCode::Internal,
1297            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1298            Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
1299        }
1300    }
1301
1302    fn as_any(&self) -> &dyn std::any::Any {
1303        self
1304    }
1305}
1306
1307// for form tonic
1308pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io::Error> {
1309    let mut err: &(dyn std::error::Error + 'static) = err_status;
1310
1311    loop {
1312        if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
1313            return Some(io_err);
1314        }
1315
1316        // h2::Error do not expose std::io::Error with `source()`
1317        // https://github.com/hyperium/h2/pull/462
1318        if let Some(h2_err) = err.downcast_ref::<h2::Error>()
1319            && let Some(io_err) = h2_err.get_io()
1320        {
1321            return Some(io_err);
1322        }
1323
1324        err = err.source()?;
1325    }
1326}