Skip to main content

servers/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::net::SocketAddr;
17use std::string::FromUtf8Error;
18use std::sync::Arc;
19
20use axum::http::StatusCode as HttpStatusCode;
21use axum::response::{IntoResponse, Response};
22use axum::{Json, http};
23use base64::DecodeError;
24use common_base::readable_size::ReadableSize;
25use common_error::define_into_tonic_status;
26use common_error::ext::{BoxedError, ErrorExt};
27use common_error::status_code::StatusCode;
28use common_macro::stack_trace_debug;
29use common_telemetry::{error, warn};
30use datafusion::error::DataFusionError;
31use datatypes::prelude::ConcreteDataType;
32use headers::ContentType;
33use http::header::InvalidHeaderValue;
34use query::parser::PromQuery;
35use serde_json::json;
36use snafu::{Location, Snafu};
37
38#[derive(Snafu)]
39#[snafu(visibility(pub))]
40#[stack_trace_debug]
41pub enum Error {
42    #[snafu(display("Failed to bind address: {}", addr))]
43    AddressBind {
44        addr: SocketAddr,
45        #[snafu(source)]
46        error: std::io::Error,
47        #[snafu(implicit)]
48        location: Location,
49    },
50
51    #[snafu(display("Arrow error"))]
52    Arrow {
53        #[snafu(source)]
54        error: arrow_schema::ArrowError,
55        #[snafu(implicit)]
56        location: Location,
57    },
58
59    #[snafu(display("Internal error: {}", err_msg))]
60    Internal { err_msg: String },
61
62    #[snafu(display("Pending rows batcher channel closed"))]
63    BatcherChannelClosed,
64
65    #[snafu(display("Unsupported data type: {}, reason: {}", data_type, reason))]
66    UnsupportedDataType {
67        data_type: ConcreteDataType,
68        reason: String,
69    },
70
71    #[snafu(display("Internal IO error"))]
72    InternalIo {
73        #[snafu(source)]
74        error: std::io::Error,
75    },
76
77    #[snafu(display("Tokio IO error: {}", err_msg))]
78    TokioIo {
79        err_msg: String,
80        #[snafu(source)]
81        error: std::io::Error,
82    },
83
84    #[snafu(display("Failed to collect recordbatch"))]
85    CollectRecordbatch {
86        #[snafu(implicit)]
87        location: Location,
88        source: common_recordbatch::error::Error,
89    },
90
91    #[snafu(display("Failed to start HTTP server"))]
92    StartHttp {
93        #[snafu(source)]
94        error: hyper::Error,
95    },
96
97    #[snafu(display("Failed to start gRPC server"))]
98    StartGrpc {
99        #[snafu(source)]
100        error: tonic::transport::Error,
101    },
102
103    #[snafu(display("Request memory limit exceeded"))]
104    MemoryLimitExceeded {
105        #[snafu(implicit)]
106        location: Location,
107        source: common_memory_manager::Error,
108    },
109
110    #[snafu(display("{} server is already started", server))]
111    AlreadyStarted {
112        server: String,
113        #[snafu(implicit)]
114        location: Location,
115    },
116
117    #[snafu(display("Failed to bind address {}", addr))]
118    TcpBind {
119        addr: SocketAddr,
120        #[snafu(source)]
121        error: std::io::Error,
122    },
123
124    #[snafu(display("Failed to execute query"))]
125    ExecuteQuery {
126        #[snafu(implicit)]
127        location: Location,
128        source: BoxedError,
129    },
130
131    #[snafu(display("Failed to execute plan"))]
132    ExecutePlan {
133        #[snafu(implicit)]
134        location: Location,
135        source: BoxedError,
136    },
137
138    #[snafu(display("Execute gRPC query error"))]
139    ExecuteGrpcQuery {
140        #[snafu(implicit)]
141        location: Location,
142        source: BoxedError,
143    },
144
145    #[snafu(display("Execute gRPC request error"))]
146    ExecuteGrpcRequest {
147        #[snafu(implicit)]
148        location: Location,
149        source: BoxedError,
150    },
151
152    #[snafu(display("Failed to check database validity"))]
153    CheckDatabaseValidity {
154        #[snafu(implicit)]
155        location: Location,
156        source: BoxedError,
157    },
158
159    #[snafu(display("Failed to describe statement"))]
160    DescribeStatement { source: BoxedError },
161
162    #[snafu(display("Pipeline error"))]
163    Pipeline {
164        #[snafu(source)]
165        source: pipeline::error::Error,
166        #[snafu(implicit)]
167        location: Location,
168    },
169
170    #[snafu(display("Not supported: {}", feat))]
171    NotSupported { feat: String },
172
173    #[snafu(display("Invalid request parameter: {}", reason))]
174    InvalidParameter {
175        reason: String,
176        #[snafu(implicit)]
177        location: Location,
178    },
179
180    #[snafu(display(
181        "Too many concurrent large requests, limit: {}, request size: {}",
182        ReadableSize(*limit as u64),
183        ReadableSize(*request_size as u64)
184    ))]
185    TooManyConcurrentRequests {
186        limit: usize,
187        request_size: usize,
188        #[snafu(implicit)]
189        location: Location,
190    },
191
192    #[snafu(display("Invalid query: {}", reason))]
193    InvalidQuery {
194        reason: String,
195        #[snafu(implicit)]
196        location: Location,
197    },
198
199    #[snafu(display("Failed to parse query"))]
200    FailedToParseQuery {
201        #[snafu(implicit)]
202        location: Location,
203        source: sql::error::Error,
204    },
205
206    #[snafu(display("Failed to parse InfluxDB line protocol"))]
207    InfluxdbLineProtocol {
208        #[snafu(implicit)]
209        location: Location,
210        #[snafu(source)]
211        error: influxdb_line_protocol::Error,
212    },
213
214    #[snafu(display("Failed to write row"))]
215    RowWriter {
216        #[snafu(implicit)]
217        location: Location,
218        source: common_grpc::error::Error,
219    },
220
221    #[snafu(display("Failed to convert time precision, name: {}", name))]
222    TimePrecision {
223        name: String,
224        #[snafu(implicit)]
225        location: Location,
226    },
227
228    #[snafu(display("Invalid OpenTSDB Json request"))]
229    InvalidOpentsdbJsonRequest {
230        #[snafu(source)]
231        error: serde_json::error::Error,
232        #[snafu(implicit)]
233        location: Location,
234    },
235
236    #[snafu(display("Failed to decode prometheus remote request"))]
237    DecodePromRemoteRequest {
238        #[snafu(implicit)]
239        location: Location,
240        #[snafu(source)]
241        error: prost::DecodeError,
242    },
243
244    #[snafu(display(
245        "Failed to decode OTLP request (content-type: {content_type}): {error}. The endpoint only accepts 'application/x-protobuf' format."
246    ))]
247    DecodeOtlpRequest {
248        content_type: String,
249        #[snafu(implicit)]
250        location: Location,
251        #[snafu(source)]
252        error: prost::DecodeError,
253    },
254
255    #[snafu(display("Failed to decode Loki request: {error}"))]
256    DecodeLokiRequest {
257        #[snafu(implicit)]
258        location: Location,
259        #[snafu(source)]
260        error: prost::DecodeError,
261    },
262
263    #[snafu(display(
264        "Unsupported content type 'application/json'. OTLP endpoint only supports 'application/x-protobuf'. Please configure your OTLP exporter to use protobuf encoding."
265    ))]
266    UnsupportedJsonContentType {
267        #[snafu(implicit)]
268        location: Location,
269    },
270
271    #[snafu(display(
272        "OTLP metric input have incompatible existing tables, please refer to docs for details"
273    ))]
274    OtlpMetricModeIncompatible {
275        #[snafu(implicit)]
276        location: Location,
277    },
278
279    #[snafu(display("Common Meta error"))]
280    CommonMeta {
281        #[snafu(implicit)]
282        location: Location,
283        #[snafu(source)]
284        source: common_meta::error::Error,
285    },
286
287    #[snafu(display("Failed to decompress snappy prometheus remote request"))]
288    DecompressSnappyPromRemoteRequest {
289        #[snafu(implicit)]
290        location: Location,
291        #[snafu(source)]
292        error: snap::Error,
293    },
294
295    #[snafu(display("Failed to decompress zstd prometheus remote request"))]
296    DecompressZstdPromRemoteRequest {
297        #[snafu(implicit)]
298        location: Location,
299        #[snafu(source)]
300        error: std::io::Error,
301    },
302
303    #[snafu(display("Failed to compress prometheus remote request"))]
304    CompressPromRemoteRequest {
305        #[snafu(implicit)]
306        location: Location,
307        #[snafu(source)]
308        error: snap::Error,
309    },
310
311    #[snafu(display("Invalid prometheus remote request, msg: {}", msg))]
312    InvalidPromRemoteRequest {
313        msg: String,
314        #[snafu(implicit)]
315        location: Location,
316    },
317
318    #[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))]
319    InvalidPromRemoteReadQueryResult {
320        msg: String,
321        #[snafu(implicit)]
322        location: Location,
323    },
324
325    #[snafu(display("Invalid Flight ticket"))]
326    InvalidFlightTicket {
327        #[snafu(source)]
328        error: api::DecodeError,
329        #[snafu(implicit)]
330        location: Location,
331    },
332
333    #[snafu(display("Tls is required for {}, plain connection is rejected", server))]
334    TlsRequired { server: String },
335
336    #[snafu(display("Failed to get user info"))]
337    Auth {
338        #[snafu(implicit)]
339        location: Location,
340        source: auth::error::Error,
341    },
342
343    #[snafu(display("Not found http or grpc authorization header"))]
344    NotFoundAuthHeader {},
345
346    #[snafu(display("Not found influx http authorization info"))]
347    NotFoundInfluxAuth {},
348
349    #[snafu(display("Unsupported http auth scheme, name: {}", name))]
350    UnsupportedAuthScheme { name: String },
351
352    #[snafu(display("Invalid visibility ASCII chars"))]
353    InvalidAuthHeaderInvisibleASCII {
354        #[snafu(source)]
355        error: hyper::header::ToStrError,
356        #[snafu(implicit)]
357        location: Location,
358    },
359
360    #[snafu(display("Invalid utf-8 value"))]
361    InvalidAuthHeaderInvalidUtf8Value {
362        #[snafu(source)]
363        error: FromUtf8Error,
364        #[snafu(implicit)]
365        location: Location,
366    },
367
368    #[snafu(display("Invalid http authorization header"))]
369    InvalidAuthHeader {
370        #[snafu(implicit)]
371        location: Location,
372    },
373
374    #[snafu(display("Invalid base64 value"))]
375    InvalidBase64Value {
376        #[snafu(source)]
377        error: DecodeError,
378        #[snafu(implicit)]
379        location: Location,
380    },
381
382    #[snafu(display("Invalid utf-8 value"))]
383    InvalidUtf8Value {
384        #[snafu(source)]
385        error: FromUtf8Error,
386        #[snafu(implicit)]
387        location: Location,
388    },
389
390    #[snafu(display("Invalid http header value"))]
391    InvalidHeaderValue {
392        #[snafu(source)]
393        error: InvalidHeaderValue,
394        #[snafu(implicit)]
395        location: Location,
396    },
397
398    #[snafu(transparent)]
399    Catalog {
400        source: catalog::error::Error,
401        #[snafu(implicit)]
402        location: Location,
403    },
404
405    #[snafu(display("Cannot find requested table: {}.{}.{}", catalog, schema, table))]
406    TableNotFound {
407        catalog: String,
408        schema: String,
409        table: String,
410        #[snafu(implicit)]
411        location: Location,
412    },
413
414    #[cfg(feature = "mem-prof")]
415    #[snafu(display("Failed to dump profile data"))]
416    DumpProfileData {
417        #[snafu(implicit)]
418        location: Location,
419        source: common_mem_prof::error::Error,
420    },
421
422    #[snafu(display("Invalid prepare statement: {}", err_msg))]
423    InvalidPrepareStatement {
424        err_msg: String,
425        #[snafu(implicit)]
426        location: Location,
427    },
428
429    #[snafu(display("Failed to build HTTP response"))]
430    BuildHttpResponse {
431        #[snafu(source)]
432        error: http::Error,
433        #[snafu(implicit)]
434        location: Location,
435    },
436
437    #[snafu(display("Failed to parse PromQL: {query:?}"))]
438    ParsePromQL {
439        query: Box<PromQuery>,
440        #[snafu(implicit)]
441        location: Location,
442        source: query::error::Error,
443    },
444
445    #[snafu(display("Failed to parse timestamp: {}", timestamp))]
446    ParseTimestamp {
447        timestamp: String,
448        #[snafu(implicit)]
449        location: Location,
450        #[snafu(source)]
451        error: query::error::Error,
452    },
453
454    #[snafu(display("Failed to infer parameter types"))]
455    InferParameterTypes {
456        #[snafu(implicit)]
457        location: Location,
458        #[snafu(source)]
459        error: query::error::Error,
460    },
461
462    #[snafu(display("{}", reason))]
463    UnexpectedResult {
464        reason: String,
465        #[snafu(implicit)]
466        location: Location,
467    },
468
469    // this error is used for custom error mapping
470    // please do not delete it
471    #[snafu(display("Other error"))]
472    Other {
473        source: BoxedError,
474        #[snafu(implicit)]
475        location: Location,
476    },
477
478    #[snafu(display("Failed to join task"))]
479    JoinTask {
480        #[snafu(source)]
481        error: tokio::task::JoinError,
482        #[snafu(implicit)]
483        location: Location,
484    },
485
486    #[cfg(feature = "pprof")]
487    #[snafu(display("Failed to dump pprof data"))]
488    DumpPprof { source: common_pprof::error::Error },
489
490    #[cfg(not(windows))]
491    #[snafu(display("Failed to update jemalloc metrics"))]
492    UpdateJemallocMetrics {
493        #[snafu(source)]
494        error: tikv_jemalloc_ctl::Error,
495        #[snafu(implicit)]
496        location: Location,
497    },
498
499    #[snafu(display("DataFrame operation error"))]
500    DataFrame {
501        #[snafu(source)]
502        error: datafusion::error::DataFusionError,
503        #[snafu(implicit)]
504        location: Location,
505    },
506
507    #[snafu(display("Failed to convert scalar value"))]
508    ConvertScalarValue {
509        source: datatypes::error::Error,
510        #[snafu(implicit)]
511        location: Location,
512    },
513
514    #[snafu(display("Expected type: {:?}, actual: {:?}", expected, actual))]
515    PreparedStmtTypeMismatch {
516        expected: ConcreteDataType,
517        actual: opensrv_mysql::ColumnType,
518        #[snafu(implicit)]
519        location: Location,
520    },
521
522    #[snafu(display(
523        "Column: {}, {} incompatible, expected: {}, actual: {}",
524        column_name,
525        datatype,
526        expected,
527        actual
528    ))]
529    IncompatibleSchema {
530        column_name: String,
531        datatype: String,
532        expected: i32,
533        actual: i32,
534        #[snafu(implicit)]
535        location: Location,
536    },
537
538    #[snafu(display("Failed to convert to json"))]
539    ToJson {
540        #[snafu(source)]
541        error: serde_json::error::Error,
542        #[snafu(implicit)]
543        location: Location,
544    },
545
546    #[snafu(display("Failed to parse payload as json"))]
547    ParseJson {
548        #[snafu(source)]
549        error: serde_json::error::Error,
550        #[snafu(implicit)]
551        location: Location,
552    },
553
554    #[snafu(display("Invalid Loki labels: {}", msg))]
555    InvalidLokiLabels {
556        msg: String,
557        #[snafu(implicit)]
558        location: Location,
559    },
560
561    #[snafu(display("Invalid Loki JSON request: {}", msg))]
562    InvalidLokiPayload {
563        msg: String,
564        #[snafu(implicit)]
565        location: Location,
566    },
567
568    #[snafu(display("Unsupported content type: {:?}", content_type))]
569    UnsupportedContentType {
570        content_type: ContentType,
571        #[snafu(implicit)]
572        location: Location,
573    },
574
575    #[snafu(display("Failed to decode url"))]
576    UrlDecode {
577        #[snafu(source)]
578        error: FromUtf8Error,
579        #[snafu(implicit)]
580        location: Location,
581    },
582
583    #[snafu(display("Failed to convert Mysql value, error: {}", err_msg))]
584    MysqlValueConversion {
585        err_msg: String,
586        #[snafu(implicit)]
587        location: Location,
588    },
589
590    #[snafu(display("Invalid table name"))]
591    InvalidTableName {
592        #[snafu(source)]
593        error: tonic::metadata::errors::ToStrError,
594        #[snafu(implicit)]
595        location: Location,
596    },
597
598    #[snafu(display("Failed to initialize a watcher for file {}", path))]
599    FileWatch {
600        path: String,
601        #[snafu(source)]
602        error: notify::Error,
603    },
604
605    #[snafu(display("Timestamp overflow: {}", error))]
606    TimestampOverflow {
607        error: String,
608        #[snafu(implicit)]
609        location: Location,
610    },
611
612    #[snafu(display("Unsupported json data type for tag: {} {}", key, ty))]
613    UnsupportedJsonDataTypeForTag {
614        key: String,
615        ty: String,
616        #[snafu(implicit)]
617        location: Location,
618    },
619
620    #[snafu(display("Convert SQL value error"))]
621    ConvertSqlValue {
622        source: datatypes::error::Error,
623        #[snafu(implicit)]
624        location: Location,
625    },
626
627    #[snafu(display("Prepare statement not found: {}", name))]
628    PrepareStatementNotFound {
629        name: String,
630        #[snafu(implicit)]
631        location: Location,
632    },
633
634    #[snafu(display("Invalid elasticsearch input, reason: {}", reason))]
635    InvalidElasticsearchInput {
636        reason: String,
637        #[snafu(implicit)]
638        location: Location,
639    },
640
641    #[snafu(display("Invalid Jaeger query, reason: {}", reason))]
642    InvalidJaegerQuery {
643        reason: String,
644        #[snafu(implicit)]
645        location: Location,
646    },
647
648    #[snafu(display("DataFusion error"))]
649    DataFusion {
650        #[snafu(source)]
651        error: DataFusionError,
652        #[snafu(implicit)]
653        location: Location,
654    },
655
656    #[snafu(display("Failed to handle otel-arrow request, error message: {}", err_msg))]
657    HandleOtelArrowRequest {
658        err_msg: String,
659        #[snafu(implicit)]
660        location: Location,
661    },
662
663    #[snafu(display("Unknown hint: {}", hint))]
664    UnknownHint { hint: String },
665
666    #[snafu(display("Query has been cancelled"))]
667    Cancelled {
668        #[snafu(implicit)]
669        location: Location,
670    },
671
672    #[snafu(display("Service suspended"))]
673    Suspended {
674        #[snafu(implicit)]
675        location: Location,
676    },
677
678    #[snafu(transparent)]
679    GreptimeProto {
680        source: api::error::Error,
681        #[snafu(implicit)]
682        location: Location,
683    },
684
685    #[snafu(transparent)]
686    DataTypes {
687        source: datatypes::error::Error,
688        #[snafu(implicit)]
689        location: Location,
690    },
691
692    #[snafu(transparent)]
693    Partition {
694        source: partition::error::Error,
695        #[snafu(implicit)]
696        location: Location,
697    },
698
699    #[snafu(transparent)]
700    MetricEngine {
701        source: metric_engine::error::Error,
702        #[snafu(implicit)]
703        location: Location,
704    },
705
706    #[snafu(display("Failed to submit batch: {}", source))]
707    SubmitBatch { source: Arc<Error> },
708}
709
710pub type Result<T, E = Error> = std::result::Result<T, E>;
711
712impl ErrorExt for Error {
713    fn status_code(&self) -> StatusCode {
714        use Error::*;
715        match self {
716            Internal { .. }
717            | BatcherChannelClosed
718            | InternalIo { .. }
719            | TokioIo { .. }
720            | StartHttp { .. }
721            | StartGrpc { .. }
722            | TcpBind { .. }
723            | BuildHttpResponse { .. }
724            | Arrow { .. }
725            | FileWatch { .. } => StatusCode::Internal,
726
727            AddressBind { .. }
728            | AlreadyStarted { .. }
729            | InvalidPromRemoteReadQueryResult { .. }
730            | OtlpMetricModeIncompatible { .. } => StatusCode::IllegalState,
731
732            UnsupportedDataType { .. } => StatusCode::Unsupported,
733
734            #[cfg(not(windows))]
735            UpdateJemallocMetrics { .. } => StatusCode::Internal,
736
737            CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
738
739            ExecuteQuery { source, .. }
740            | ExecutePlan { source, .. }
741            | ExecuteGrpcQuery { source, .. }
742            | ExecuteGrpcRequest { source, .. }
743            | CheckDatabaseValidity { source, .. } => source.status_code(),
744
745            Pipeline { source, .. } => source.status_code(),
746            CommonMeta { source, .. } => source.status_code(),
747
748            NotSupported { .. }
749            | InvalidParameter { .. }
750            | InvalidQuery { .. }
751            | InfluxdbLineProtocol { .. }
752            | InvalidOpentsdbJsonRequest { .. }
753            | DecodePromRemoteRequest { .. }
754            | DecodeOtlpRequest { .. }
755            | DecodeLokiRequest { .. }
756            | UnsupportedJsonContentType { .. }
757            | CompressPromRemoteRequest { .. }
758            | DecompressSnappyPromRemoteRequest { .. }
759            | DecompressZstdPromRemoteRequest { .. }
760            | InvalidPromRemoteRequest { .. }
761            | InvalidFlightTicket { .. }
762            | InvalidPrepareStatement { .. }
763            | InferParameterTypes { .. }
764            | DataFrame { .. }
765            | PreparedStmtTypeMismatch { .. }
766            | TimePrecision { .. }
767            | UrlDecode { .. }
768            | IncompatibleSchema { .. }
769            | MysqlValueConversion { .. }
770            | ParseJson { .. }
771            | InvalidLokiLabels { .. }
772            | InvalidLokiPayload { .. }
773            | UnsupportedContentType { .. }
774            | TimestampOverflow { .. }
775            | UnsupportedJsonDataTypeForTag { .. }
776            | InvalidTableName { .. }
777            | PrepareStatementNotFound { .. }
778            | FailedToParseQuery { .. }
779            | InvalidElasticsearchInput { .. }
780            | InvalidJaegerQuery { .. }
781            | ParseTimestamp { .. }
782            | UnknownHint { .. } => StatusCode::InvalidArguments,
783
784            Catalog { source, .. } => source.status_code(),
785            RowWriter { source, .. } => source.status_code(),
786            DataTypes { source, .. } => source.status_code(),
787
788            TlsRequired { .. } => StatusCode::Unknown,
789            Auth { source, .. } => source.status_code(),
790            DescribeStatement { source } => source.status_code(),
791
792            NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
793            InvalidAuthHeaderInvisibleASCII { .. }
794            | UnsupportedAuthScheme { .. }
795            | InvalidAuthHeader { .. }
796            | InvalidBase64Value { .. }
797            | InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
798
799            TableNotFound { .. } => StatusCode::TableNotFound,
800
801            #[cfg(feature = "mem-prof")]
802            DumpProfileData { source, .. } => source.status_code(),
803
804            InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments,
805
806            TooManyConcurrentRequests { .. } => StatusCode::RuntimeResourcesExhausted,
807
808            ParsePromQL { source, .. } => source.status_code(),
809            Other { source, .. } => source.status_code(),
810
811            UnexpectedResult { .. } => StatusCode::Unexpected,
812
813            JoinTask { error, .. } => {
814                if error.is_cancelled() {
815                    StatusCode::Cancelled
816                } else if error.is_panic() {
817                    StatusCode::Unexpected
818                } else {
819                    StatusCode::Unknown
820                }
821            }
822
823            #[cfg(feature = "pprof")]
824            DumpPprof { source, .. } => source.status_code(),
825
826            ConvertScalarValue { source, .. } => source.status_code(),
827
828            ToJson { .. } | DataFusion { .. } => StatusCode::Internal,
829
830            ConvertSqlValue { source, .. } => source.status_code(),
831
832            HandleOtelArrowRequest { .. } => StatusCode::Internal,
833
834            Cancelled { .. } => StatusCode::Cancelled,
835
836            Suspended { .. } => StatusCode::Suspended,
837
838            MemoryLimitExceeded { .. } => StatusCode::RateLimited,
839
840            GreptimeProto { source, .. } => source.status_code(),
841            Partition { source, .. } => source.status_code(),
842            MetricEngine { source, .. } => source.status_code(),
843            SubmitBatch { source, .. } => source.status_code(),
844        }
845    }
846
847    fn as_any(&self) -> &dyn Any {
848        self
849    }
850}
851
852define_into_tonic_status!(Error);
853
854impl From<std::io::Error> for Error {
855    fn from(e: std::io::Error) -> Self {
856        Error::InternalIo { error: e }
857    }
858}
859
860fn log_error_if_necessary(error: &Error) {
861    if error.status_code().should_log_error() {
862        error!(error; "Failed to handle HTTP request ");
863    } else {
864        warn!(error; "Failed to handle HTTP request ");
865    }
866}
867
868impl IntoResponse for Error {
869    fn into_response(self) -> Response {
870        let error_msg = self.output_msg();
871        let status = status_code_to_http_status(&self.status_code());
872
873        log_error_if_necessary(&self);
874
875        let body = Json(json!({
876            "error": error_msg,
877        }));
878        (status, body).into_response()
879    }
880}
881
882/// Converts [StatusCode] to [HttpStatusCode].
883pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
884    match status_code {
885        StatusCode::Success => HttpStatusCode::OK,
886
887        // When a request is cancelled by the client (e.g., by a client side timeout),
888        // we should return a gateway timeout status code to the external client.
889        StatusCode::Cancelled | StatusCode::DeadlineExceeded => HttpStatusCode::GATEWAY_TIMEOUT,
890
891        StatusCode::Unsupported
892        | StatusCode::InvalidArguments
893        | StatusCode::InvalidSyntax
894        | StatusCode::RequestOutdated
895        | StatusCode::RegionAlreadyExists
896        | StatusCode::TableColumnExists
897        | StatusCode::TableAlreadyExists
898        | StatusCode::RegionNotFound
899        | StatusCode::DatabaseNotFound
900        | StatusCode::TableNotFound
901        | StatusCode::TableColumnNotFound
902        | StatusCode::PlanQuery
903        | StatusCode::DatabaseAlreadyExists
904        | StatusCode::TriggerAlreadyExists
905        | StatusCode::TriggerNotFound
906        | StatusCode::FlowNotFound
907        | StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
908
909        StatusCode::AuthHeaderNotFound
910        | StatusCode::InvalidAuthHeader
911        | StatusCode::UserNotFound
912        | StatusCode::UnsupportedPasswordType
913        | StatusCode::UserPasswordMismatch
914        | StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,
915
916        StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
917
918        StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,
919
920        StatusCode::RegionNotReady
921        | StatusCode::TableUnavailable
922        | StatusCode::RegionBusy
923        | StatusCode::StorageUnavailable
924        | StatusCode::External
925        | StatusCode::Suspended => HttpStatusCode::SERVICE_UNAVAILABLE,
926
927        StatusCode::Internal
928        | StatusCode::Unexpected
929        | StatusCode::IllegalState
930        | StatusCode::Unknown
931        | StatusCode::RuntimeResourcesExhausted
932        | StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR,
933    }
934}