Skip to main content

servers/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::net::SocketAddr;
17use std::string::FromUtf8Error;
18use std::sync::Arc;
19
20use axum::http::StatusCode as HttpStatusCode;
21use axum::response::{IntoResponse, Response};
22use axum::{Json, http};
23use base64::DecodeError;
24use common_base::readable_size::ReadableSize;
25use common_error::define_into_tonic_status;
26use common_error::ext::{BoxedError, ErrorExt};
27use common_error::status_code::StatusCode;
28use common_macro::stack_trace_debug;
29use common_telemetry::{error, warn};
30use datafusion::error::DataFusionError;
31use datatypes::prelude::ConcreteDataType;
32use headers::ContentType;
33use http::header::InvalidHeaderValue;
34use query::parser::PromQuery;
35use serde_json::json;
36use snafu::{Location, Snafu};
37
38#[derive(Snafu)]
39#[snafu(visibility(pub))]
40#[stack_trace_debug]
41pub enum Error {
42    #[snafu(display("Failed to bind address: {}", addr))]
43    AddressBind {
44        addr: SocketAddr,
45        #[snafu(source)]
46        error: std::io::Error,
47        #[snafu(implicit)]
48        location: Location,
49    },
50
51    #[snafu(display("Arrow error"))]
52    Arrow {
53        #[snafu(source)]
54        error: arrow_schema::ArrowError,
55        #[snafu(implicit)]
56        location: Location,
57    },
58
59    #[snafu(display("Internal error: {}", err_msg))]
60    Internal { err_msg: String },
61
62    #[snafu(display("Pending rows batcher channel closed"))]
63    BatcherChannelClosed,
64
65    #[snafu(display("Unsupported data type: {}, reason: {}", data_type, reason))]
66    UnsupportedDataType {
67        data_type: ConcreteDataType,
68        reason: String,
69    },
70
71    #[snafu(display("Internal IO error"))]
72    InternalIo {
73        #[snafu(source)]
74        error: std::io::Error,
75    },
76
77    #[snafu(display("Tokio IO error: {}", err_msg))]
78    TokioIo {
79        err_msg: String,
80        #[snafu(source)]
81        error: std::io::Error,
82    },
83
84    #[snafu(display("Failed to collect recordbatch"))]
85    CollectRecordbatch {
86        #[snafu(implicit)]
87        location: Location,
88        source: common_recordbatch::error::Error,
89    },
90
91    #[snafu(display("Failed to start HTTP server"))]
92    StartHttp {
93        #[snafu(source)]
94        error: hyper::Error,
95    },
96
97    #[snafu(display("Failed to start gRPC server"))]
98    StartGrpc {
99        #[snafu(source)]
100        error: tonic::transport::Error,
101    },
102
103    #[snafu(display("Request memory limit exceeded"))]
104    MemoryLimitExceeded {
105        #[snafu(implicit)]
106        location: Location,
107        source: common_memory_manager::Error,
108    },
109
110    #[snafu(display("{} server is already started", server))]
111    AlreadyStarted {
112        server: String,
113        #[snafu(implicit)]
114        location: Location,
115    },
116
117    #[snafu(display("Failed to bind address {}", addr))]
118    TcpBind {
119        addr: SocketAddr,
120        #[snafu(source)]
121        error: std::io::Error,
122    },
123
124    #[snafu(display("Failed to execute query"))]
125    ExecuteQuery {
126        #[snafu(implicit)]
127        location: Location,
128        source: BoxedError,
129    },
130
131    #[snafu(display("Failed to execute plan"))]
132    ExecutePlan {
133        #[snafu(implicit)]
134        location: Location,
135        source: BoxedError,
136    },
137
138    #[snafu(display("Execute gRPC query error"))]
139    ExecuteGrpcQuery {
140        #[snafu(implicit)]
141        location: Location,
142        source: BoxedError,
143    },
144
145    #[snafu(display("Execute gRPC request error"))]
146    ExecuteGrpcRequest {
147        #[snafu(implicit)]
148        location: Location,
149        source: BoxedError,
150    },
151
152    #[snafu(display("Failed to check database validity"))]
153    CheckDatabaseValidity {
154        #[snafu(implicit)]
155        location: Location,
156        source: BoxedError,
157    },
158
159    #[snafu(display("Failed to describe statement"))]
160    DescribeStatement { source: BoxedError },
161
162    #[snafu(display("Pipeline error"))]
163    Pipeline {
164        #[snafu(source)]
165        source: pipeline::error::Error,
166        #[snafu(implicit)]
167        location: Location,
168    },
169
170    #[snafu(display("Not supported: {}", feat))]
171    NotSupported { feat: String },
172
173    #[snafu(display("Invalid request parameter: {}", reason))]
174    InvalidParameter {
175        reason: String,
176        #[snafu(implicit)]
177        location: Location,
178    },
179
180    #[snafu(display(
181        "Too many concurrent large requests, limit: {}, request size: {}",
182        ReadableSize(*limit as u64),
183        ReadableSize(*request_size as u64)
184    ))]
185    TooManyConcurrentRequests {
186        limit: usize,
187        request_size: usize,
188        #[snafu(implicit)]
189        location: Location,
190    },
191
192    #[snafu(display("Invalid query: {}", reason))]
193    InvalidQuery {
194        reason: String,
195        #[snafu(implicit)]
196        location: Location,
197    },
198
199    #[snafu(display("Failed to parse query"))]
200    FailedToParseQuery {
201        #[snafu(implicit)]
202        location: Location,
203        source: sql::error::Error,
204    },
205
206    #[snafu(display("Failed to parse InfluxDB line protocol"))]
207    InfluxdbLineProtocol {
208        #[snafu(implicit)]
209        location: Location,
210        #[snafu(source)]
211        error: influxdb_line_protocol::Error,
212    },
213
214    #[snafu(display("Failed to write row"))]
215    RowWriter {
216        #[snafu(implicit)]
217        location: Location,
218        source: common_grpc::error::Error,
219    },
220
221    #[snafu(display("Failed to convert time precision, name: {}", name))]
222    TimePrecision {
223        name: String,
224        #[snafu(implicit)]
225        location: Location,
226    },
227
228    #[snafu(display("Invalid OpenTSDB Json request"))]
229    InvalidOpentsdbJsonRequest {
230        #[snafu(source)]
231        error: serde_json::error::Error,
232        #[snafu(implicit)]
233        location: Location,
234    },
235
236    #[snafu(display("Failed to decode prometheus remote request"))]
237    DecodePromRemoteRequest {
238        #[snafu(implicit)]
239        location: Location,
240        #[snafu(source)]
241        error: prost::DecodeError,
242    },
243
244    #[snafu(display(
245        "Failed to decode OTLP request (content-type: {content_type}): {error}. The endpoint only accepts 'application/x-protobuf' format."
246    ))]
247    DecodeOtlpRequest {
248        content_type: String,
249        #[snafu(implicit)]
250        location: Location,
251        #[snafu(source)]
252        error: prost::DecodeError,
253    },
254
255    #[snafu(display("Failed to decode Loki request: {error}"))]
256    DecodeLokiRequest {
257        #[snafu(implicit)]
258        location: Location,
259        #[snafu(source)]
260        error: prost::DecodeError,
261    },
262
263    #[snafu(display(
264        "Unsupported content type 'application/json'. OTLP endpoint only supports 'application/x-protobuf'. Please configure your OTLP exporter to use protobuf encoding."
265    ))]
266    UnsupportedJsonContentType {
267        #[snafu(implicit)]
268        location: Location,
269    },
270
271    #[snafu(display(
272        "OTLP metric input have incompatible existing tables, please refer to docs for details"
273    ))]
274    OtlpMetricModeIncompatible {
275        #[snafu(implicit)]
276        location: Location,
277    },
278
279    #[snafu(display("Common Meta error"))]
280    CommonMeta {
281        #[snafu(implicit)]
282        location: Location,
283        #[snafu(source)]
284        source: common_meta::error::Error,
285    },
286
287    #[snafu(display("Failed to decompress snappy prometheus remote request"))]
288    DecompressSnappyPromRemoteRequest {
289        #[snafu(implicit)]
290        location: Location,
291        #[snafu(source)]
292        error: snap::Error,
293    },
294
295    #[snafu(display("Failed to decompress snappy Loki request"))]
296    DecompressSnappyLokiRequest {
297        #[snafu(implicit)]
298        location: Location,
299        #[snafu(source)]
300        error: snap::Error,
301    },
302
303    #[snafu(display("Failed to decompress zstd prometheus remote request"))]
304    DecompressZstdPromRemoteRequest {
305        #[snafu(implicit)]
306        location: Location,
307        #[snafu(source)]
308        error: std::io::Error,
309    },
310
311    #[snafu(display("Failed to compress prometheus remote request"))]
312    CompressPromRemoteRequest {
313        #[snafu(implicit)]
314        location: Location,
315        #[snafu(source)]
316        error: snap::Error,
317    },
318
319    #[snafu(display("Invalid prometheus remote request, msg: {}", msg))]
320    InvalidPromRemoteRequest {
321        msg: String,
322        #[snafu(implicit)]
323        location: Location,
324    },
325
326    #[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))]
327    InvalidPromRemoteReadQueryResult {
328        msg: String,
329        #[snafu(implicit)]
330        location: Location,
331    },
332
333    #[snafu(display("Invalid Flight ticket"))]
334    InvalidFlightTicket {
335        #[snafu(source)]
336        error: api::DecodeError,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("Tls is required for {}, plain connection is rejected", server))]
342    TlsRequired { server: String },
343
344    #[snafu(display("Failed to get user info"))]
345    Auth {
346        #[snafu(implicit)]
347        location: Location,
348        source: auth::error::Error,
349    },
350
351    #[snafu(display("Not found http or grpc authorization header"))]
352    NotFoundAuthHeader {},
353
354    #[snafu(display("Not found influx http authorization info"))]
355    NotFoundInfluxAuth {},
356
357    #[snafu(display("Unsupported http auth scheme, name: {}", name))]
358    UnsupportedAuthScheme { name: String },
359
360    #[snafu(display("Invalid visibility ASCII chars"))]
361    InvalidAuthHeaderInvisibleASCII {
362        #[snafu(source)]
363        error: hyper::header::ToStrError,
364        #[snafu(implicit)]
365        location: Location,
366    },
367
368    #[snafu(display("Invalid utf-8 value"))]
369    InvalidAuthHeaderInvalidUtf8Value {
370        #[snafu(source)]
371        error: FromUtf8Error,
372        #[snafu(implicit)]
373        location: Location,
374    },
375
376    #[snafu(display("Invalid http authorization header"))]
377    InvalidAuthHeader {
378        #[snafu(implicit)]
379        location: Location,
380    },
381
382    #[snafu(display("Invalid base64 value"))]
383    InvalidBase64Value {
384        #[snafu(source)]
385        error: DecodeError,
386        #[snafu(implicit)]
387        location: Location,
388    },
389
390    #[snafu(display("Invalid utf-8 value"))]
391    InvalidUtf8Value {
392        #[snafu(source)]
393        error: FromUtf8Error,
394        #[snafu(implicit)]
395        location: Location,
396    },
397
398    #[snafu(display("Invalid http header value"))]
399    InvalidHeaderValue {
400        #[snafu(source)]
401        error: InvalidHeaderValue,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(transparent)]
407    Catalog {
408        source: catalog::error::Error,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Cannot find requested table: {}.{}.{}", catalog, schema, table))]
414    TableNotFound {
415        catalog: String,
416        schema: String,
417        table: String,
418        #[snafu(implicit)]
419        location: Location,
420    },
421
422    #[cfg(feature = "mem-prof")]
423    #[snafu(display("Failed to dump profile data"))]
424    DumpProfileData {
425        #[snafu(implicit)]
426        location: Location,
427        source: common_mem_prof::error::Error,
428    },
429
430    #[snafu(display("Invalid prepare statement: {}", err_msg))]
431    InvalidPrepareStatement {
432        err_msg: String,
433        #[snafu(implicit)]
434        location: Location,
435    },
436
437    #[snafu(display("Failed to build HTTP response"))]
438    BuildHttpResponse {
439        #[snafu(source)]
440        error: http::Error,
441        #[snafu(implicit)]
442        location: Location,
443    },
444
445    #[snafu(display("Failed to parse PromQL: {query:?}"))]
446    ParsePromQL {
447        query: Box<PromQuery>,
448        #[snafu(implicit)]
449        location: Location,
450        source: query::error::Error,
451    },
452
453    #[snafu(display("Failed to parse timestamp: {}", timestamp))]
454    ParseTimestamp {
455        timestamp: String,
456        #[snafu(implicit)]
457        location: Location,
458        #[snafu(source)]
459        error: query::error::Error,
460    },
461
462    #[snafu(display("Failed to infer parameter types"))]
463    InferParameterTypes {
464        #[snafu(implicit)]
465        location: Location,
466        #[snafu(source)]
467        error: query::error::Error,
468    },
469
470    #[snafu(display("{}", reason))]
471    UnexpectedResult {
472        reason: String,
473        #[snafu(implicit)]
474        location: Location,
475    },
476
477    // this error is used for custom error mapping
478    // please do not delete it
479    #[snafu(display("Other error"))]
480    Other {
481        source: BoxedError,
482        #[snafu(implicit)]
483        location: Location,
484    },
485
486    #[snafu(display("Failed to join task"))]
487    JoinTask {
488        #[snafu(source)]
489        error: tokio::task::JoinError,
490        #[snafu(implicit)]
491        location: Location,
492    },
493
494    #[cfg(feature = "pprof")]
495    #[snafu(display("Failed to dump pprof data"))]
496    DumpPprof { source: common_pprof::error::Error },
497
498    #[cfg(not(windows))]
499    #[snafu(display("Failed to update jemalloc metrics"))]
500    UpdateJemallocMetrics {
501        #[snafu(source)]
502        error: tikv_jemalloc_ctl::Error,
503        #[snafu(implicit)]
504        location: Location,
505    },
506
507    #[snafu(display("DataFrame operation error"))]
508    DataFrame {
509        #[snafu(source)]
510        error: datafusion::error::DataFusionError,
511        #[snafu(implicit)]
512        location: Location,
513    },
514
515    #[snafu(display("Failed to convert scalar value"))]
516    ConvertScalarValue {
517        source: datatypes::error::Error,
518        #[snafu(implicit)]
519        location: Location,
520    },
521
522    #[snafu(display("Expected type: {:?}, actual: {:?}", expected, actual))]
523    PreparedStmtTypeMismatch {
524        expected: ConcreteDataType,
525        actual: opensrv_mysql::ColumnType,
526        #[snafu(implicit)]
527        location: Location,
528    },
529
530    #[snafu(display(
531        "Column: {}, {} incompatible, expected: {}, actual: {}",
532        column_name,
533        datatype,
534        expected,
535        actual
536    ))]
537    IncompatibleSchema {
538        column_name: String,
539        datatype: String,
540        expected: i32,
541        actual: i32,
542        #[snafu(implicit)]
543        location: Location,
544    },
545
546    #[snafu(display("Failed to convert to json"))]
547    ToJson {
548        #[snafu(source)]
549        error: serde_json::error::Error,
550        #[snafu(implicit)]
551        location: Location,
552    },
553
554    #[snafu(display("Failed to parse payload as json"))]
555    ParseJson {
556        #[snafu(source)]
557        error: serde_json::error::Error,
558        #[snafu(implicit)]
559        location: Location,
560    },
561
562    #[snafu(display("Invalid Loki labels: {}", msg))]
563    InvalidLokiLabels {
564        msg: String,
565        #[snafu(implicit)]
566        location: Location,
567    },
568
569    #[snafu(display("Invalid Loki JSON request: {}", msg))]
570    InvalidLokiPayload {
571        msg: String,
572        #[snafu(implicit)]
573        location: Location,
574    },
575
576    #[snafu(display("Unsupported content type: {:?}", content_type))]
577    UnsupportedContentType {
578        content_type: ContentType,
579        #[snafu(implicit)]
580        location: Location,
581    },
582
583    #[snafu(display("Failed to decode url"))]
584    UrlDecode {
585        #[snafu(source)]
586        error: FromUtf8Error,
587        #[snafu(implicit)]
588        location: Location,
589    },
590
591    #[snafu(display("Failed to convert Mysql value, error: {}", err_msg))]
592    MysqlValueConversion {
593        err_msg: String,
594        #[snafu(implicit)]
595        location: Location,
596    },
597
598    #[snafu(display("Invalid table name"))]
599    InvalidTableName {
600        #[snafu(source)]
601        error: tonic::metadata::errors::ToStrError,
602        #[snafu(implicit)]
603        location: Location,
604    },
605
606    #[snafu(display("Failed to initialize a watcher for file {}", path))]
607    FileWatch {
608        path: String,
609        #[snafu(source)]
610        error: notify::Error,
611    },
612
613    #[snafu(display("Timestamp overflow: {}", error))]
614    TimestampOverflow {
615        error: String,
616        #[snafu(implicit)]
617        location: Location,
618    },
619
620    #[snafu(display("Unsupported json data type for tag: {} {}", key, ty))]
621    UnsupportedJsonDataTypeForTag {
622        key: String,
623        ty: String,
624        #[snafu(implicit)]
625        location: Location,
626    },
627
628    #[snafu(display("Convert SQL value error"))]
629    ConvertSqlValue {
630        source: datatypes::error::Error,
631        #[snafu(implicit)]
632        location: Location,
633    },
634
635    #[snafu(display("Prepare statement not found: {}", name))]
636    PrepareStatementNotFound {
637        name: String,
638        #[snafu(implicit)]
639        location: Location,
640    },
641
642    #[snafu(display("Invalid elasticsearch input, reason: {}", reason))]
643    InvalidElasticsearchInput {
644        reason: String,
645        #[snafu(implicit)]
646        location: Location,
647    },
648
649    #[snafu(display("Invalid Jaeger query, reason: {}", reason))]
650    InvalidJaegerQuery {
651        reason: String,
652        #[snafu(implicit)]
653        location: Location,
654    },
655
656    #[snafu(display("DataFusion error"))]
657    DataFusion {
658        #[snafu(source)]
659        error: DataFusionError,
660        #[snafu(implicit)]
661        location: Location,
662    },
663
664    #[snafu(display("Failed to handle otel-arrow request, error message: {}", err_msg))]
665    HandleOtelArrowRequest {
666        err_msg: String,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Unknown hint: {}", hint))]
672    UnknownHint { hint: String },
673
674    #[snafu(display("Query has been cancelled"))]
675    Cancelled {
676        #[snafu(implicit)]
677        location: Location,
678    },
679
680    #[snafu(display("Service suspended"))]
681    Suspended {
682        #[snafu(implicit)]
683        location: Location,
684    },
685
686    #[snafu(transparent)]
687    GreptimeProto {
688        source: api::error::Error,
689        #[snafu(implicit)]
690        location: Location,
691    },
692
693    #[snafu(transparent)]
694    DataTypes {
695        source: datatypes::error::Error,
696        #[snafu(implicit)]
697        location: Location,
698    },
699
700    #[snafu(transparent)]
701    Partition {
702        source: partition::error::Error,
703        #[snafu(implicit)]
704        location: Location,
705    },
706
707    #[snafu(transparent)]
708    MetricEngine {
709        source: metric_engine::error::Error,
710        #[snafu(implicit)]
711        location: Location,
712    },
713
714    #[snafu(display("Failed to submit batch: {}", source))]
715    SubmitBatch { source: Arc<Error> },
716}
717
718pub type Result<T, E = Error> = std::result::Result<T, E>;
719
720impl ErrorExt for Error {
721    fn status_code(&self) -> StatusCode {
722        use Error::*;
723        match self {
724            Internal { .. }
725            | BatcherChannelClosed
726            | InternalIo { .. }
727            | TokioIo { .. }
728            | StartHttp { .. }
729            | StartGrpc { .. }
730            | TcpBind { .. }
731            | BuildHttpResponse { .. }
732            | Arrow { .. }
733            | FileWatch { .. } => StatusCode::Internal,
734
735            AddressBind { .. }
736            | AlreadyStarted { .. }
737            | InvalidPromRemoteReadQueryResult { .. }
738            | OtlpMetricModeIncompatible { .. } => StatusCode::IllegalState,
739
740            UnsupportedDataType { .. } => StatusCode::Unsupported,
741
742            #[cfg(not(windows))]
743            UpdateJemallocMetrics { .. } => StatusCode::Internal,
744
745            CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
746
747            ExecuteQuery { source, .. }
748            | ExecutePlan { source, .. }
749            | ExecuteGrpcQuery { source, .. }
750            | ExecuteGrpcRequest { source, .. }
751            | CheckDatabaseValidity { source, .. } => source.status_code(),
752
753            Pipeline { source, .. } => source.status_code(),
754            CommonMeta { source, .. } => source.status_code(),
755
756            NotSupported { .. }
757            | InvalidParameter { .. }
758            | InvalidQuery { .. }
759            | InfluxdbLineProtocol { .. }
760            | InvalidOpentsdbJsonRequest { .. }
761            | DecodePromRemoteRequest { .. }
762            | DecodeOtlpRequest { .. }
763            | DecodeLokiRequest { .. }
764            | UnsupportedJsonContentType { .. }
765            | CompressPromRemoteRequest { .. }
766            | DecompressSnappyPromRemoteRequest { .. }
767            | DecompressSnappyLokiRequest { .. }
768            | DecompressZstdPromRemoteRequest { .. }
769            | InvalidPromRemoteRequest { .. }
770            | InvalidFlightTicket { .. }
771            | InvalidPrepareStatement { .. }
772            | InferParameterTypes { .. }
773            | DataFrame { .. }
774            | PreparedStmtTypeMismatch { .. }
775            | TimePrecision { .. }
776            | UrlDecode { .. }
777            | IncompatibleSchema { .. }
778            | MysqlValueConversion { .. }
779            | ParseJson { .. }
780            | InvalidLokiLabels { .. }
781            | InvalidLokiPayload { .. }
782            | UnsupportedContentType { .. }
783            | TimestampOverflow { .. }
784            | UnsupportedJsonDataTypeForTag { .. }
785            | InvalidTableName { .. }
786            | PrepareStatementNotFound { .. }
787            | FailedToParseQuery { .. }
788            | InvalidElasticsearchInput { .. }
789            | InvalidJaegerQuery { .. }
790            | ParseTimestamp { .. }
791            | UnknownHint { .. } => StatusCode::InvalidArguments,
792
793            Catalog { source, .. } => source.status_code(),
794            RowWriter { source, .. } => source.status_code(),
795            DataTypes { source, .. } => source.status_code(),
796
797            TlsRequired { .. } => StatusCode::Unknown,
798            Auth { source, .. } => source.status_code(),
799            DescribeStatement { source } => source.status_code(),
800
801            NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
802            InvalidAuthHeaderInvisibleASCII { .. }
803            | UnsupportedAuthScheme { .. }
804            | InvalidAuthHeader { .. }
805            | InvalidBase64Value { .. }
806            | InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
807
808            TableNotFound { .. } => StatusCode::TableNotFound,
809
810            #[cfg(feature = "mem-prof")]
811            DumpProfileData { source, .. } => source.status_code(),
812
813            InvalidUtf8Value { .. } | InvalidHeaderValue { .. } => StatusCode::InvalidArguments,
814
815            TooManyConcurrentRequests { .. } => StatusCode::RuntimeResourcesExhausted,
816
817            ParsePromQL { source, .. } => source.status_code(),
818            Other { source, .. } => source.status_code(),
819
820            UnexpectedResult { .. } => StatusCode::Unexpected,
821
822            JoinTask { error, .. } => {
823                if error.is_cancelled() {
824                    StatusCode::Cancelled
825                } else if error.is_panic() {
826                    StatusCode::Unexpected
827                } else {
828                    StatusCode::Unknown
829                }
830            }
831
832            #[cfg(feature = "pprof")]
833            DumpPprof { source, .. } => source.status_code(),
834
835            ConvertScalarValue { source, .. } => source.status_code(),
836
837            ToJson { .. } | DataFusion { .. } => StatusCode::Internal,
838
839            ConvertSqlValue { source, .. } => source.status_code(),
840
841            HandleOtelArrowRequest { .. } => StatusCode::Internal,
842
843            Cancelled { .. } => StatusCode::Cancelled,
844
845            Suspended { .. } => StatusCode::Suspended,
846
847            MemoryLimitExceeded { .. } => StatusCode::RateLimited,
848
849            GreptimeProto { source, .. } => source.status_code(),
850            Partition { source, .. } => source.status_code(),
851            MetricEngine { source, .. } => source.status_code(),
852            SubmitBatch { source, .. } => source.status_code(),
853        }
854    }
855
856    fn as_any(&self) -> &dyn Any {
857        self
858    }
859}
860
861define_into_tonic_status!(Error);
862
863impl From<std::io::Error> for Error {
864    fn from(e: std::io::Error) -> Self {
865        Error::InternalIo { error: e }
866    }
867}
868
869fn log_error_if_necessary(error: &Error) {
870    if error.status_code().should_log_error() {
871        error!(error; "Failed to handle HTTP request ");
872    } else {
873        warn!(error; "Failed to handle HTTP request ");
874    }
875}
876
877impl IntoResponse for Error {
878    fn into_response(self) -> Response {
879        let error_msg = self.output_msg();
880        let status = status_code_to_http_status(&self.status_code());
881
882        log_error_if_necessary(&self);
883
884        let body = Json(json!({
885            "error": error_msg,
886        }));
887        (status, body).into_response()
888    }
889}
890
891/// Converts [StatusCode] to [HttpStatusCode].
892pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
893    match status_code {
894        StatusCode::Success => HttpStatusCode::OK,
895
896        // When a request is cancelled by the client (e.g., by a client side timeout),
897        // we should return a gateway timeout status code to the external client.
898        StatusCode::Cancelled | StatusCode::DeadlineExceeded => HttpStatusCode::GATEWAY_TIMEOUT,
899
900        StatusCode::Unsupported
901        | StatusCode::InvalidArguments
902        | StatusCode::InvalidSyntax
903        | StatusCode::RequestOutdated
904        | StatusCode::RegionAlreadyExists
905        | StatusCode::TableColumnExists
906        | StatusCode::TableAlreadyExists
907        | StatusCode::RegionNotFound
908        | StatusCode::DatabaseNotFound
909        | StatusCode::TableNotFound
910        | StatusCode::TableColumnNotFound
911        | StatusCode::PlanQuery
912        | StatusCode::DatabaseAlreadyExists
913        | StatusCode::TriggerAlreadyExists
914        | StatusCode::TriggerNotFound
915        | StatusCode::FlowNotFound
916        | StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
917
918        StatusCode::AuthHeaderNotFound
919        | StatusCode::InvalidAuthHeader
920        | StatusCode::UserNotFound
921        | StatusCode::UnsupportedPasswordType
922        | StatusCode::UserPasswordMismatch
923        | StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,
924
925        StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,
926
927        StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,
928
929        StatusCode::RegionNotReady
930        | StatusCode::TableUnavailable
931        | StatusCode::RegionBusy
932        | StatusCode::StorageUnavailable
933        | StatusCode::External
934        | StatusCode::Suspended => HttpStatusCode::SERVICE_UNAVAILABLE,
935
936        StatusCode::Internal
937        | StatusCode::Unexpected
938        | StatusCode::IllegalState
939        | StatusCode::Unknown
940        | StatusCode::RuntimeResourcesExhausted
941        | StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR,
942    }
943}