common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to register repartition procedure loader"))]
108    RegisterRepartitionProcedureLoader {
109        #[snafu(implicit)]
110        location: Location,
111        source: BoxedError,
112    },
113
114    #[snafu(display("Failed to create repartition procedure"))]
115    CreateRepartitionProcedure {
116        source: BoxedError,
117        #[snafu(implicit)]
118        location: Location,
119    },
120
121    #[snafu(display("Failed to submit procedure"))]
122    SubmitProcedure {
123        #[snafu(implicit)]
124        location: Location,
125        source: common_procedure::Error,
126    },
127
128    #[snafu(display("Failed to query procedure"))]
129    QueryProcedure {
130        #[snafu(implicit)]
131        location: Location,
132        source: common_procedure::Error,
133    },
134
135    #[snafu(display("Procedure not found: {pid}"))]
136    ProcedureNotFound {
137        #[snafu(implicit)]
138        location: Location,
139        pid: String,
140    },
141
142    #[snafu(display("Failed to parse procedure id: {key}"))]
143    ParseProcedureId {
144        #[snafu(implicit)]
145        location: Location,
146        key: String,
147        #[snafu(source)]
148        error: common_procedure::ParseIdError,
149    },
150
151    #[snafu(display("Unsupported operation {}", operation))]
152    Unsupported {
153        operation: String,
154        #[snafu(implicit)]
155        location: Location,
156    },
157
158    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159    ProcedureStateReceiver {
160        procedure_id: ProcedureId,
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167    ProcedureStateReceiverNotFound {
168        procedure_id: ProcedureId,
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Failed to wait procedure done"))]
174    WaitProcedure {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display("Failed to start procedure manager"))]
181    StartProcedureManager {
182        #[snafu(implicit)]
183        location: Location,
184        source: common_procedure::Error,
185    },
186
187    #[snafu(display("Failed to stop procedure manager"))]
188    StopProcedureManager {
189        #[snafu(implicit)]
190        location: Location,
191        source: common_procedure::Error,
192    },
193
194    #[snafu(display(
195        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196    ))]
197    ProcedureOutput {
198        procedure_id: String,
199        err_msg: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Primary key '{key}' not found when creating region request"))]
205    PrimaryKeyNotFound {
206        key: String,
207        #[snafu(implicit)]
208        location: Location,
209    },
210
211    #[snafu(display("Failed to build table meta for table: {}", table_name))]
212    BuildTableMeta {
213        table_name: String,
214        #[snafu(source)]
215        error: table::metadata::TableMetaBuilderError,
216        #[snafu(implicit)]
217        location: Location,
218    },
219
220    #[snafu(display("Table occurs error"))]
221    Table {
222        #[snafu(implicit)]
223        location: Location,
224        source: table::error::Error,
225    },
226
227    #[snafu(display("Failed to find table route for table id {}", table_id))]
228    TableRouteNotFound {
229        table_id: TableId,
230        #[snafu(implicit)]
231        location: Location,
232    },
233
234    #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
235    TableRepartNotFound {
236        table_id: TableId,
237        #[snafu(implicit)]
238        location: Location,
239    },
240
241    #[snafu(display("Failed to decode protobuf"))]
242    DecodeProto {
243        #[snafu(implicit)]
244        location: Location,
245        #[snafu(source)]
246        error: prost::DecodeError,
247    },
248
249    #[snafu(display("Failed to encode object into json"))]
250    EncodeJson {
251        #[snafu(implicit)]
252        location: Location,
253        #[snafu(source)]
254        error: JsonError,
255    },
256
257    #[snafu(display("Failed to decode object from json"))]
258    DecodeJson {
259        #[snafu(implicit)]
260        location: Location,
261        #[snafu(source)]
262        error: JsonError,
263    },
264
265    #[snafu(display("Failed to serialize to json: {}", input))]
266    SerializeToJson {
267        input: String,
268        #[snafu(source)]
269        error: serde_json::error::Error,
270        #[snafu(implicit)]
271        location: Location,
272    },
273
274    #[snafu(display("Failed to deserialize from json: {}", input))]
275    DeserializeFromJson {
276        input: String,
277        #[snafu(source)]
278        error: serde_json::error::Error,
279        #[snafu(implicit)]
280        location: Location,
281    },
282
283    #[snafu(display("Payload not exist"))]
284    PayloadNotExist {
285        #[snafu(implicit)]
286        location: Location,
287    },
288
289    #[snafu(display("Failed to serde json"))]
290    SerdeJson {
291        #[snafu(source)]
292        error: serde_json::error::Error,
293        #[snafu(implicit)]
294        location: Location,
295    },
296
297    #[snafu(display("Failed to parse value {} into key {}", value, key))]
298    ParseOption {
299        key: String,
300        value: String,
301        #[snafu(implicit)]
302        location: Location,
303    },
304
305    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
306    RouteInfoCorrupted {
307        err_msg: String,
308        #[snafu(implicit)]
309        location: Location,
310    },
311
312    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
313    IllegalServerState {
314        code: i32,
315        err_msg: String,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Failed to convert alter table request"))]
321    ConvertAlterTableRequest {
322        source: common_grpc_expr::error::Error,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Invalid protobuf message: {err_msg}"))]
328    InvalidProtoMsg {
329        err_msg: String,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Unexpected: {err_msg}"))]
335    Unexpected {
336        err_msg: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("Metasrv election has no leader at this moment"))]
342    ElectionNoLeader {
343        #[snafu(implicit)]
344        location: Location,
345    },
346
347    #[snafu(display("Metasrv election leader lease expired"))]
348    ElectionLeaderLeaseExpired {
349        #[snafu(implicit)]
350        location: Location,
351    },
352
353    #[snafu(display("Metasrv election leader lease changed during election"))]
354    ElectionLeaderLeaseChanged {
355        #[snafu(implicit)]
356        location: Location,
357    },
358
359    #[snafu(display("Table already exists, table: {}", table_name))]
360    TableAlreadyExists {
361        table_name: String,
362        #[snafu(implicit)]
363        location: Location,
364    },
365
366    #[snafu(display("View already exists, view: {}", view_name))]
367    ViewAlreadyExists {
368        view_name: String,
369        #[snafu(implicit)]
370        location: Location,
371    },
372
373    #[snafu(display("Flow already exists: {}", flow_name))]
374    FlowAlreadyExists {
375        flow_name: String,
376        #[snafu(implicit)]
377        location: Location,
378    },
379
380    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
381    SchemaAlreadyExists {
382        catalog: String,
383        schema: String,
384        #[snafu(implicit)]
385        location: Location,
386    },
387
388    #[snafu(display("Failed to convert raw key to str"))]
389    ConvertRawKey {
390        #[snafu(implicit)]
391        location: Location,
392        #[snafu(source)]
393        error: Utf8Error,
394    },
395
396    #[snafu(display("Table not found: '{}'", table_name))]
397    TableNotFound {
398        table_name: String,
399        #[snafu(implicit)]
400        location: Location,
401    },
402
403    #[snafu(display("Region not found: {}", region_id))]
404    RegionNotFound {
405        region_id: RegionId,
406        #[snafu(implicit)]
407        location: Location,
408    },
409
410    #[snafu(display("View not found: '{}'", view_name))]
411    ViewNotFound {
412        view_name: String,
413        #[snafu(implicit)]
414        location: Location,
415    },
416
417    #[snafu(display("Flow not found: '{}'", flow_name))]
418    FlowNotFound {
419        flow_name: String,
420        #[snafu(implicit)]
421        location: Location,
422    },
423
424    #[snafu(display("Flow route not found: '{}'", flow_name))]
425    FlowRouteNotFound {
426        flow_name: String,
427        #[snafu(implicit)]
428        location: Location,
429    },
430
431    #[snafu(display("Schema nod found, schema: {}", table_schema))]
432    SchemaNotFound {
433        table_schema: String,
434        #[snafu(implicit)]
435        location: Location,
436    },
437
438    #[snafu(display("Catalog not found, catalog: {}", catalog))]
439    CatalogNotFound {
440        catalog: String,
441        #[snafu(implicit)]
442        location: Location,
443    },
444
445    #[snafu(display("Invalid metadata, err: {}", err_msg))]
446    InvalidMetadata {
447        err_msg: String,
448        #[snafu(implicit)]
449        location: Location,
450    },
451
452    #[snafu(display("Invalid view info, err: {}", err_msg))]
453    InvalidViewInfo {
454        err_msg: String,
455        #[snafu(implicit)]
456        location: Location,
457    },
458
459    #[snafu(display("Invalid flow request body: {:?}", body))]
460    InvalidFlowRequestBody {
461        body: Box<Option<api::v1::flow::flow_request::Body>>,
462        #[snafu(implicit)]
463        location: Location,
464    },
465
466    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
467    GetKvCache { err_msg: String },
468
469    #[snafu(display("Get null from cache, key: {}", key))]
470    CacheNotGet {
471        key: String,
472        #[snafu(implicit)]
473        location: Location,
474    },
475
476    #[snafu(display("Etcd txn error: {err_msg}"))]
477    EtcdTxnOpResponse {
478        err_msg: String,
479        #[snafu(implicit)]
480        location: Location,
481    },
482
483    #[snafu(display("External error"))]
484    External {
485        #[snafu(implicit)]
486        location: Location,
487        source: BoxedError,
488    },
489
490    #[snafu(display("The response exceeded size limit"))]
491    ResponseExceededSizeLimit {
492        #[snafu(implicit)]
493        location: Location,
494        source: BoxedError,
495    },
496
497    #[snafu(display("Invalid heartbeat response"))]
498    InvalidHeartbeatResponse {
499        #[snafu(implicit)]
500        location: Location,
501    },
502
503    #[snafu(display("Failed to operate on datanode: {}", peer))]
504    OperateDatanode {
505        #[snafu(implicit)]
506        location: Location,
507        peer: Peer,
508        source: BoxedError,
509    },
510
511    #[snafu(display("Retry later"))]
512    RetryLater {
513        source: BoxedError,
514        clean_poisons: bool,
515    },
516
517    #[snafu(display("Abort procedure"))]
518    AbortProcedure {
519        #[snafu(implicit)]
520        location: Location,
521        source: BoxedError,
522        clean_poisons: bool,
523    },
524
525    #[snafu(display(
526        "Failed to encode a wal options to json string, wal_options: {:?}",
527        wal_options
528    ))]
529    EncodeWalOptions {
530        wal_options: WalOptions,
531        #[snafu(source)]
532        error: serde_json::Error,
533        #[snafu(implicit)]
534        location: Location,
535    },
536
537    #[snafu(display("Invalid number of topics {}", num_topics))]
538    InvalidNumTopics {
539        num_topics: usize,
540        #[snafu(implicit)]
541        location: Location,
542    },
543
544    #[snafu(display(
545        "Failed to build a Kafka client, broker endpoints: {:?}",
546        broker_endpoints
547    ))]
548    BuildKafkaClient {
549        broker_endpoints: Vec<String>,
550        #[snafu(implicit)]
551        location: Location,
552        #[snafu(source)]
553        error: rskafka::client::error::Error,
554    },
555
556    #[snafu(display("Failed to create TLS Config"))]
557    TlsConfig {
558        #[snafu(implicit)]
559        location: Location,
560        source: common_wal::error::Error,
561    },
562
563    #[snafu(display("Failed to build a Kafka controller client"))]
564    BuildKafkaCtrlClient {
565        #[snafu(implicit)]
566        location: Location,
567        #[snafu(source)]
568        error: rskafka::client::error::Error,
569    },
570
571    #[snafu(display(
572        "Failed to get a Kafka partition client, topic: {}, partition: {}",
573        topic,
574        partition
575    ))]
576    KafkaPartitionClient {
577        topic: String,
578        partition: i32,
579        #[snafu(implicit)]
580        location: Location,
581        #[snafu(source)]
582        error: rskafka::client::error::Error,
583    },
584
585    #[snafu(display(
586        "Failed to get offset from Kafka, topic: {}, partition: {}",
587        topic,
588        partition
589    ))]
590    KafkaGetOffset {
591        topic: String,
592        partition: i32,
593        #[snafu(implicit)]
594        location: Location,
595        #[snafu(source)]
596        error: rskafka::client::error::Error,
597    },
598
599    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
600    ProduceRecord {
601        topic: String,
602        #[snafu(implicit)]
603        location: Location,
604        #[snafu(source)]
605        error: rskafka::client::error::Error,
606    },
607
608    #[snafu(display("Failed to create a Kafka wal topic"))]
609    CreateKafkaWalTopic {
610        #[snafu(implicit)]
611        location: Location,
612        #[snafu(source)]
613        error: rskafka::client::error::Error,
614    },
615
616    #[snafu(display("The topic pool is empty"))]
617    EmptyTopicPool {
618        #[snafu(implicit)]
619        location: Location,
620    },
621
622    #[snafu(display("Unexpected table route type: {}", err_msg))]
623    UnexpectedLogicalRouteTable {
624        #[snafu(implicit)]
625        location: Location,
626        err_msg: String,
627    },
628
629    #[snafu(display("The tasks of {} cannot be empty", name))]
630    EmptyDdlTasks {
631        name: String,
632        #[snafu(implicit)]
633        location: Location,
634    },
635
636    #[snafu(display("Metadata corruption: {}", err_msg))]
637    MetadataCorruption {
638        err_msg: String,
639        #[snafu(implicit)]
640        location: Location,
641    },
642
643    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
644    AlterLogicalTablesInvalidArguments {
645        err_msg: String,
646        #[snafu(implicit)]
647        location: Location,
648    },
649
650    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
651    CreateLogicalTablesInvalidArguments {
652        err_msg: String,
653        #[snafu(implicit)]
654        location: Location,
655    },
656
657    #[snafu(display("Invalid node info key: {}", key))]
658    InvalidNodeInfoKey {
659        key: String,
660        #[snafu(implicit)]
661        location: Location,
662    },
663
664    #[snafu(display("Invalid node stat key: {}", key))]
665    InvalidStatKey {
666        key: String,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Failed to parse number: {}", err_msg))]
672    ParseNum {
673        err_msg: String,
674        #[snafu(source)]
675        error: std::num::ParseIntError,
676        #[snafu(implicit)]
677        location: Location,
678    },
679
680    #[snafu(display("Invalid role: {}", role))]
681    InvalidRole {
682        role: i32,
683        #[snafu(implicit)]
684        location: Location,
685    },
686
687    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
688    InvalidSetDatabaseOption {
689        key: String,
690        value: String,
691        #[snafu(implicit)]
692        location: Location,
693    },
694
695    #[snafu(display("Invalid unset database option, key: {}", key))]
696    InvalidUnsetDatabaseOption {
697        key: String,
698        #[snafu(implicit)]
699        location: Location,
700    },
701
702    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
703    MismatchPrefix {
704        prefix: String,
705        key: String,
706        #[snafu(implicit)]
707        location: Location,
708    },
709
710    #[snafu(display("Failed to move values: {err_msg}"))]
711    MoveValues {
712        err_msg: String,
713        #[snafu(implicit)]
714        location: Location,
715    },
716
717    #[snafu(display("Failed to parse {} from utf8", name))]
718    FromUtf8 {
719        name: String,
720        #[snafu(source)]
721        error: std::string::FromUtf8Error,
722        #[snafu(implicit)]
723        location: Location,
724    },
725
726    #[snafu(display("Value not exists"))]
727    ValueNotExist {
728        #[snafu(implicit)]
729        location: Location,
730    },
731
732    #[snafu(display("Failed to get cache"))]
733    GetCache { source: Arc<Error> },
734
735    #[snafu(display(
736        "Failed to get latest cache value after {} attempts due to concurrent invalidation",
737        attempts
738    ))]
739    GetLatestCacheRetryExceeded {
740        attempts: usize,
741        #[snafu(implicit)]
742        location: Location,
743    },
744
745    #[cfg(feature = "pg_kvbackend")]
746    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
747    PostgresExecution {
748        sql: String,
749        #[snafu(source)]
750        error: tokio_postgres::Error,
751        #[snafu(implicit)]
752        location: Location,
753    },
754
755    #[cfg(feature = "pg_kvbackend")]
756    #[snafu(display("Failed to create connection pool for Postgres"))]
757    CreatePostgresPool {
758        #[snafu(source)]
759        error: deadpool_postgres::CreatePoolError,
760        #[snafu(implicit)]
761        location: Location,
762    },
763
764    #[cfg(feature = "pg_kvbackend")]
765    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
766    GetPostgresConnection {
767        reason: String,
768        #[snafu(implicit)]
769        location: Location,
770    },
771
772    #[cfg(feature = "pg_kvbackend")]
773    #[snafu(display("Failed to get Postgres client"))]
774    GetPostgresClient {
775        #[snafu(source)]
776        error: deadpool::managed::PoolError<tokio_postgres::Error>,
777        #[snafu(implicit)]
778        location: Location,
779    },
780
781    #[cfg(feature = "pg_kvbackend")]
782    #[snafu(display("Failed to {} Postgres transaction", operation))]
783    PostgresTransaction {
784        #[snafu(source)]
785        error: tokio_postgres::Error,
786        #[snafu(implicit)]
787        location: Location,
788        operation: String,
789    },
790
791    #[cfg(feature = "pg_kvbackend")]
792    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
793    PostgresTlsConfig {
794        reason: String,
795        #[snafu(implicit)]
796        location: Location,
797    },
798
799    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
800    LoadTlsCertificate {
801        path: String,
802        #[snafu(source)]
803        error: std::io::Error,
804        #[snafu(implicit)]
805        location: Location,
806    },
807
808    #[cfg(feature = "pg_kvbackend")]
809    #[snafu(display("Invalid TLS configuration: {}", reason))]
810    InvalidTlsConfig {
811        reason: String,
812        #[snafu(implicit)]
813        location: Location,
814    },
815
816    #[cfg(feature = "mysql_kvbackend")]
817    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
818    MySqlExecution {
819        sql: String,
820        #[snafu(source)]
821        error: sqlx::Error,
822        #[snafu(implicit)]
823        location: Location,
824    },
825
826    #[cfg(feature = "mysql_kvbackend")]
827    #[snafu(display("Failed to create connection pool for MySql"))]
828    CreateMySqlPool {
829        #[snafu(source)]
830        error: sqlx::Error,
831        #[snafu(implicit)]
832        location: Location,
833    },
834
835    #[cfg(feature = "mysql_kvbackend")]
836    #[snafu(display("Failed to decode sql value"))]
837    DecodeSqlValue {
838        #[snafu(source)]
839        error: sqlx::error::Error,
840        #[snafu(implicit)]
841        location: Location,
842    },
843
844    #[cfg(feature = "mysql_kvbackend")]
845    #[snafu(display("Failed to acquire mysql client from pool"))]
846    AcquireMySqlClient {
847        #[snafu(source)]
848        error: sqlx::Error,
849        #[snafu(implicit)]
850        location: Location,
851    },
852
853    #[cfg(feature = "mysql_kvbackend")]
854    #[snafu(display("Failed to {} MySql transaction", operation))]
855    MySqlTransaction {
856        #[snafu(source)]
857        error: sqlx::Error,
858        #[snafu(implicit)]
859        location: Location,
860        operation: String,
861    },
862
863    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
864    #[snafu(display("Rds transaction retry failed"))]
865    RdsTransactionRetryFailed {
866        #[snafu(implicit)]
867        location: Location,
868    },
869
870    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
871    #[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
872    SqlExecutionTimeout {
873        sql: String,
874        duration: std::time::Duration,
875        #[snafu(implicit)]
876        location: Location,
877    },
878
879    #[snafu(display(
880        "Datanode table info not found, table id: {}, datanode id: {}",
881        table_id,
882        datanode_id
883    ))]
884    DatanodeTableInfoNotFound {
885        datanode_id: DatanodeId,
886        table_id: TableId,
887        #[snafu(implicit)]
888        location: Location,
889    },
890
891    #[snafu(display("Invalid topic name prefix: {}", prefix))]
892    InvalidTopicNamePrefix {
893        prefix: String,
894        #[snafu(implicit)]
895        location: Location,
896    },
897
898    #[snafu(display("Failed to parse wal options: {}", wal_options))]
899    ParseWalOptions {
900        wal_options: String,
901        #[snafu(implicit)]
902        location: Location,
903        #[snafu(source)]
904        error: serde_json::Error,
905    },
906
907    #[snafu(display("No leader found for table_id: {}", table_id))]
908    NoLeader {
909        table_id: TableId,
910        #[snafu(implicit)]
911        location: Location,
912    },
913
914    #[snafu(display(
915        "Procedure poison key already exists with a different value, key: {}, value: {}",
916        key,
917        value
918    ))]
919    ProcedurePoisonConflict {
920        key: String,
921        value: String,
922        #[snafu(implicit)]
923        location: Location,
924    },
925
926    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
927    PutPoison {
928        #[snafu(implicit)]
929        location: Location,
930        #[snafu(source)]
931        source: common_procedure::error::Error,
932    },
933
934    #[snafu(display("Invalid file path: {}", file_path))]
935    InvalidFilePath {
936        #[snafu(implicit)]
937        location: Location,
938        file_path: String,
939    },
940
941    #[snafu(display("Failed to serialize flexbuffers"))]
942    SerializeFlexbuffers {
943        #[snafu(implicit)]
944        location: Location,
945        #[snafu(source)]
946        error: flexbuffers::SerializationError,
947    },
948
949    #[snafu(display("Failed to deserialize flexbuffers"))]
950    DeserializeFlexbuffers {
951        #[snafu(implicit)]
952        location: Location,
953        #[snafu(source)]
954        error: flexbuffers::DeserializationError,
955    },
956
957    #[snafu(display("Failed to read flexbuffers"))]
958    ReadFlexbuffers {
959        #[snafu(implicit)]
960        location: Location,
961        #[snafu(source)]
962        error: flexbuffers::ReaderError,
963    },
964
965    #[snafu(display("Invalid file name: {}", reason))]
966    InvalidFileName {
967        #[snafu(implicit)]
968        location: Location,
969        reason: String,
970    },
971
972    #[snafu(display("Invalid file extension: {}", reason))]
973    InvalidFileExtension {
974        #[snafu(implicit)]
975        location: Location,
976        reason: String,
977    },
978
979    #[snafu(display("Failed to write object, file path: {}", file_path))]
980    WriteObject {
981        #[snafu(implicit)]
982        location: Location,
983        file_path: String,
984        #[snafu(source)]
985        error: object_store::Error,
986    },
987
988    #[snafu(display("Failed to read object, file path: {}", file_path))]
989    ReadObject {
990        #[snafu(implicit)]
991        location: Location,
992        file_path: String,
993        #[snafu(source)]
994        error: object_store::Error,
995    },
996
997    #[snafu(display("Missing column ids"))]
998    MissingColumnIds {
999        #[snafu(implicit)]
1000        location: Location,
1001    },
1002
1003    #[snafu(display(
1004        "Missing column in column metadata: {}, table: {}, table_id: {}",
1005        column_name,
1006        table_name,
1007        table_id,
1008    ))]
1009    MissingColumnInColumnMetadata {
1010        column_name: String,
1011        #[snafu(implicit)]
1012        location: Location,
1013        table_name: String,
1014        table_id: TableId,
1015    },
1016
1017    #[snafu(display(
1018        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
1019        column_name,
1020        column_id,
1021        table_name,
1022        table_id,
1023    ))]
1024    MismatchColumnId {
1025        column_name: String,
1026        column_id: u32,
1027        #[snafu(implicit)]
1028        location: Location,
1029        table_name: String,
1030        table_id: TableId,
1031    },
1032
1033    #[snafu(display("Failed to convert column def, column: {}", column))]
1034    ConvertColumnDef {
1035        column: String,
1036        #[snafu(implicit)]
1037        location: Location,
1038        source: api::error::Error,
1039    },
1040
1041    #[snafu(display("Failed to convert time ranges"))]
1042    ConvertTimeRanges {
1043        #[snafu(implicit)]
1044        location: Location,
1045        source: api::error::Error,
1046    },
1047
1048    #[snafu(display(
1049        "Column metadata inconsistencies found in table: {}, table_id: {}",
1050        table_name,
1051        table_id
1052    ))]
1053    ColumnMetadataConflicts {
1054        table_name: String,
1055        table_id: TableId,
1056    },
1057
1058    #[snafu(display(
1059        "Column not found in column metadata, column_name: {}, column_id: {}",
1060        column_name,
1061        column_id
1062    ))]
1063    ColumnNotFound { column_name: String, column_id: u32 },
1064
1065    #[snafu(display(
1066        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1067        column_name,
1068        expected_column_id,
1069        actual_column_id
1070    ))]
1071    ColumnIdMismatch {
1072        column_name: String,
1073        expected_column_id: u32,
1074        actual_column_id: u32,
1075    },
1076
1077    #[snafu(display(
1078        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1079        expected_column_name,
1080        expected_column_id,
1081        actual_column_name,
1082        actual_column_id,
1083    ))]
1084    TimestampMismatch {
1085        expected_column_name: String,
1086        expected_column_id: u32,
1087        actual_column_name: String,
1088        actual_column_id: u32,
1089    },
1090
1091    #[cfg(feature = "enterprise")]
1092    #[snafu(display("Too large duration"))]
1093    TooLargeDuration {
1094        #[snafu(source)]
1095        error: prost_types::DurationError,
1096        #[snafu(implicit)]
1097        location: Location,
1098    },
1099
1100    #[cfg(feature = "enterprise")]
1101    #[snafu(display("Negative duration"))]
1102    NegativeDuration {
1103        #[snafu(source)]
1104        error: prost_types::DurationError,
1105        #[snafu(implicit)]
1106        location: Location,
1107    },
1108
1109    #[cfg(feature = "enterprise")]
1110    #[snafu(display("Missing interval field"))]
1111    MissingInterval {
1112        #[snafu(implicit)]
1113        location: Location,
1114    },
1115}
1116
1117pub type Result<T> = std::result::Result<T, Error>;
1118
1119impl ErrorExt for Error {
1120    fn status_code(&self) -> StatusCode {
1121        use Error::*;
1122        match self {
1123            IllegalServerState { .. }
1124            | EtcdTxnOpResponse { .. }
1125            | EtcdFailed { .. }
1126            | EtcdTxnFailed { .. }
1127            | ConnectEtcd { .. }
1128            | MoveValues { .. }
1129            | GetCache { .. }
1130            | GetLatestCacheRetryExceeded { .. }
1131            | SerializeToJson { .. }
1132            | DeserializeFromJson { .. }
1133            | ElectionNoLeader { .. }
1134            | ElectionLeaderLeaseExpired { .. }
1135            | ElectionLeaderLeaseChanged { .. } => StatusCode::Internal,
1136
1137            NoLeader { .. } => StatusCode::TableUnavailable,
1138            ValueNotExist { .. }
1139            | ProcedurePoisonConflict { .. }
1140            | ProcedureStateReceiverNotFound { .. }
1141            | MissingColumnIds { .. }
1142            | MissingColumnInColumnMetadata { .. }
1143            | MismatchColumnId { .. }
1144            | ColumnMetadataConflicts { .. }
1145            | ColumnNotFound { .. }
1146            | ColumnIdMismatch { .. }
1147            | TimestampMismatch { .. } => StatusCode::Unexpected,
1148
1149            Unsupported { .. } => StatusCode::Unsupported,
1150            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1151
1152            SerdeJson { .. }
1153            | ParseOption { .. }
1154            | RouteInfoCorrupted { .. }
1155            | InvalidProtoMsg { .. }
1156            | InvalidMetadata { .. }
1157            | Unexpected { .. }
1158            | TableInfoNotFound { .. }
1159            | NextSequence { .. }
1160            | UnexpectedSequenceValue { .. }
1161            | InvalidHeartbeatResponse { .. }
1162            | EncodeJson { .. }
1163            | DecodeJson { .. }
1164            | PayloadNotExist { .. }
1165            | ConvertRawKey { .. }
1166            | DecodeProto { .. }
1167            | BuildTableMeta { .. }
1168            | TableRouteNotFound { .. }
1169            | TableRepartNotFound { .. }
1170            | RegionOperatingRace { .. }
1171            | EncodeWalOptions { .. }
1172            | BuildKafkaClient { .. }
1173            | BuildKafkaCtrlClient { .. }
1174            | KafkaPartitionClient { .. }
1175            | ProduceRecord { .. }
1176            | CreateKafkaWalTopic { .. }
1177            | EmptyTopicPool { .. }
1178            | UnexpectedLogicalRouteTable { .. }
1179            | ProcedureOutput { .. }
1180            | FromUtf8 { .. }
1181            | MetadataCorruption { .. }
1182            | ParseWalOptions { .. }
1183            | KafkaGetOffset { .. }
1184            | ReadFlexbuffers { .. }
1185            | SerializeFlexbuffers { .. }
1186            | DeserializeFlexbuffers { .. }
1187            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1188
1189            GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1190
1191            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1192
1193            ProcedureNotFound { .. }
1194            | InvalidViewInfo { .. }
1195            | PrimaryKeyNotFound { .. }
1196            | EmptyKey { .. }
1197            | AlterLogicalTablesInvalidArguments { .. }
1198            | CreateLogicalTablesInvalidArguments { .. }
1199            | MismatchPrefix { .. }
1200            | TlsConfig { .. }
1201            | InvalidSetDatabaseOption { .. }
1202            | InvalidUnsetDatabaseOption { .. }
1203            | InvalidTopicNamePrefix { .. }
1204            | InvalidFileExtension { .. }
1205            | InvalidFileName { .. }
1206            | InvalidFlowRequestBody { .. }
1207            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1208
1209            #[cfg(feature = "enterprise")]
1210            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1211                StatusCode::InvalidArguments
1212            }
1213
1214            FlowNotFound { .. } => StatusCode::FlowNotFound,
1215            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1216            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1217
1218            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1219                StatusCode::TableNotFound
1220            }
1221            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1222
1223            SubmitProcedure { source, .. }
1224            | QueryProcedure { source, .. }
1225            | WaitProcedure { source, .. }
1226            | StartProcedureManager { source, .. }
1227            | StopProcedureManager { source, .. } => source.status_code(),
1228            RegisterProcedureLoader { source, .. } => source.status_code(),
1229            External { source, .. } => source.status_code(),
1230            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1231            OperateDatanode { source, .. } => source.status_code(),
1232            Table { source, .. } => source.status_code(),
1233            RetryLater { source, .. } => source.status_code(),
1234            AbortProcedure { source, .. } => source.status_code(),
1235            ConvertAlterTableRequest { source, .. } => source.status_code(),
1236            PutPoison { source, .. } => source.status_code(),
1237            ConvertColumnDef { source, .. } => source.status_code(),
1238            ProcedureStateReceiver { source, .. } => source.status_code(),
1239            RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1240            CreateRepartitionProcedure { source, .. } => source.status_code(),
1241
1242            ParseProcedureId { .. }
1243            | InvalidNumTopics { .. }
1244            | SchemaNotFound { .. }
1245            | CatalogNotFound { .. }
1246            | InvalidNodeInfoKey { .. }
1247            | InvalidStatKey { .. }
1248            | ParseNum { .. }
1249            | InvalidRole { .. }
1250            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1251
1252            LoadTlsCertificate { .. } => StatusCode::Internal,
1253
1254            #[cfg(feature = "pg_kvbackend")]
1255            PostgresExecution { .. }
1256            | CreatePostgresPool { .. }
1257            | GetPostgresConnection { .. }
1258            | GetPostgresClient { .. }
1259            | PostgresTransaction { .. }
1260            | PostgresTlsConfig { .. }
1261            | InvalidTlsConfig { .. } => StatusCode::Internal,
1262            #[cfg(feature = "mysql_kvbackend")]
1263            MySqlExecution { .. }
1264            | CreateMySqlPool { .. }
1265            | DecodeSqlValue { .. }
1266            | AcquireMySqlClient { .. }
1267            | MySqlTransaction { .. } => StatusCode::Internal,
1268            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1269            RdsTransactionRetryFailed { .. } | SqlExecutionTimeout { .. } => StatusCode::Internal,
1270            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1271        }
1272    }
1273
1274    fn as_any(&self) -> &dyn std::any::Any {
1275        self
1276    }
1277}
1278
1279impl Error {
1280    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1281    /// Check if the error is a serialization error.
1282    pub fn is_serialization_error(&self) -> bool {
1283        match self {
1284            #[cfg(feature = "pg_kvbackend")]
1285            Error::PostgresTransaction { error, .. } => {
1286                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1287            }
1288            #[cfg(feature = "pg_kvbackend")]
1289            Error::PostgresExecution { error, .. } => {
1290                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1291            }
1292            #[cfg(feature = "mysql_kvbackend")]
1293            Error::MySqlExecution {
1294                error: sqlx::Error::Database(database_error),
1295                ..
1296            } => {
1297                matches!(
1298                    database_error.message(),
1299                    "Deadlock found when trying to get lock; try restarting transaction"
1300                        | "can't serialize access for this transaction"
1301                )
1302            }
1303            _ => false,
1304        }
1305    }
1306
1307    /// Creates a new [Error::RetryLater] error from source `err`.
1308    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1309        Error::RetryLater {
1310            source: BoxedError::new(err),
1311            clean_poisons: false,
1312        }
1313    }
1314
1315    /// Determine whether it is a retry later type through [StatusCode]
1316    pub fn is_retry_later(&self) -> bool {
1317        matches!(
1318            self,
1319            Error::RetryLater { .. } | Error::GetLatestCacheRetryExceeded { .. }
1320        )
1321    }
1322
1323    /// Determine whether it needs to clean poisons.
1324    pub fn need_clean_poisons(&self) -> bool {
1325        matches!(
1326            self,
1327            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1328        ) || matches!(
1329            self,
1330            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1331        )
1332    }
1333
1334    /// Returns true if the response exceeds the size limit.
1335    pub fn is_exceeded_size_limit(&self) -> bool {
1336        match self {
1337            Error::EtcdFailed {
1338                error: etcd_client::Error::GRpcStatus(status),
1339                ..
1340            } => status.code() == tonic::Code::OutOfRange,
1341            Error::ResponseExceededSizeLimit { .. } => true,
1342            _ => false,
1343        }
1344    }
1345}