servers/
http.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::net::SocketAddr;
18use std::sync::Mutex as StdMutex;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use auth::UserProviderRef;
23use axum::extract::DefaultBodyLimit;
24use axum::http::StatusCode as HttpStatusCode;
25use axum::response::{IntoResponse, Response};
26use axum::serve::ListenerExt;
27use axum::{middleware, routing, Router};
28use common_base::readable_size::ReadableSize;
29use common_base::Plugins;
30use common_recordbatch::RecordBatch;
31use common_telemetry::{error, info};
32use common_time::timestamp::TimeUnit;
33use common_time::Timestamp;
34use datatypes::data_type::DataType;
35use datatypes::schema::SchemaRef;
36use datatypes::value::transform_value_ref_to_json_value;
37use event::{LogState, LogValidatorRef};
38use futures::FutureExt;
39use http::{HeaderValue, Method};
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use snafu::{ensure, ResultExt};
43use tokio::sync::oneshot::{self, Sender};
44use tokio::sync::Mutex;
45use tower::ServiceBuilder;
46use tower_http::compression::CompressionLayer;
47use tower_http::cors::{AllowOrigin, Any, CorsLayer};
48use tower_http::decompression::RequestDecompressionLayer;
49use tower_http::trace::TraceLayer;
50
51use self::authorize::AuthState;
52use self::result::table_result::TableResponse;
53use crate::configurator::ConfiguratorRef;
54use crate::elasticsearch;
55use crate::error::{
56    AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
57    ToJsonSnafu,
58};
59use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
60use crate::http::prom_store::PromStoreState;
61use crate::http::prometheus::{
62    build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
63    range_query, series_query,
64};
65use crate::http::result::arrow_result::ArrowResponse;
66use crate::http::result::csv_result::CsvResponse;
67use crate::http::result::error_result::ErrorResponse;
68use crate::http::result::greptime_result_v1::GreptimedbV1Response;
69use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
70use crate::http::result::json_result::JsonResponse;
71use crate::interceptor::LogIngestInterceptorRef;
72use crate::metrics::http_metrics_layer;
73use crate::metrics_handler::MetricsHandler;
74use crate::prometheus_handler::PrometheusHandlerRef;
75use crate::query_handler::sql::ServerSqlQueryHandlerRef;
76use crate::query_handler::{
77    InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
78    OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
79    PromStoreProtocolHandlerRef,
80};
81use crate::server::Server;
82
83pub mod authorize;
84#[cfg(feature = "dashboard")]
85mod dashboard;
86pub mod dyn_log;
87pub mod event;
88mod extractor;
89pub mod handler;
90pub mod header;
91pub mod influxdb;
92pub mod jaeger;
93pub mod logs;
94pub mod loki;
95pub mod mem_prof;
96pub mod opentsdb;
97pub mod otlp;
98pub mod pprof;
99pub mod prom_store;
100pub mod prometheus;
101pub mod result;
102mod timeout;
103
104pub(crate) use timeout::DynamicTimeoutLayer;
105
106mod hints;
107mod read_preference;
108#[cfg(any(test, feature = "testing"))]
109pub mod test_helpers;
110
111pub const HTTP_API_VERSION: &str = "v1";
112pub const HTTP_API_PREFIX: &str = "/v1/";
113/// Default http body limit (64M).
114const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
115
116/// Authorization header
117pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
118
119// TODO(fys): This is a temporary workaround, it will be improved later
120pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
121
122#[derive(Default)]
123pub struct HttpServer {
124    router: StdMutex<Router>,
125    shutdown_tx: Mutex<Option<Sender<()>>>,
126    user_provider: Option<UserProviderRef>,
127
128    // plugins
129    plugins: Plugins,
130
131    // server configs
132    options: HttpOptions,
133    bind_addr: Option<SocketAddr>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(default)]
138pub struct HttpOptions {
139    pub addr: String,
140
141    #[serde(with = "humantime_serde")]
142    pub timeout: Duration,
143
144    #[serde(skip)]
145    pub disable_dashboard: bool,
146
147    pub body_limit: ReadableSize,
148
149    /// Validation mode while decoding Prometheus remote write requests.
150    pub prom_validation_mode: PromValidationMode,
151
152    pub cors_allowed_origins: Vec<String>,
153
154    pub enable_cors: bool,
155}
156
157#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum PromValidationMode {
160    /// Force UTF8 validation
161    Strict,
162    /// Allow lossy UTF8 strings
163    Lossy,
164    /// Do not validate UTF8 strings.
165    Unchecked,
166}
167
168impl Default for HttpOptions {
169    fn default() -> Self {
170        Self {
171            addr: "127.0.0.1:4000".to_string(),
172            timeout: Duration::from_secs(0),
173            disable_dashboard: false,
174            body_limit: DEFAULT_BODY_LIMIT,
175            cors_allowed_origins: Vec::new(),
176            enable_cors: true,
177            prom_validation_mode: PromValidationMode::Strict,
178        }
179    }
180}
181
182#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
183pub struct ColumnSchema {
184    name: String,
185    data_type: String,
186}
187
188impl ColumnSchema {
189    pub fn new(name: String, data_type: String) -> ColumnSchema {
190        ColumnSchema { name, data_type }
191    }
192}
193
194#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
195pub struct OutputSchema {
196    column_schemas: Vec<ColumnSchema>,
197}
198
199impl OutputSchema {
200    pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
201        OutputSchema {
202            column_schemas: columns,
203        }
204    }
205}
206
207impl From<SchemaRef> for OutputSchema {
208    fn from(schema: SchemaRef) -> OutputSchema {
209        OutputSchema {
210            column_schemas: schema
211                .column_schemas()
212                .iter()
213                .map(|cs| ColumnSchema {
214                    name: cs.name.clone(),
215                    data_type: cs.data_type.name(),
216                })
217                .collect(),
218        }
219    }
220}
221
222#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
223pub struct HttpRecordsOutput {
224    schema: OutputSchema,
225    rows: Vec<Vec<Value>>,
226    // total_rows is equal to rows.len() in most cases,
227    // the Dashboard query result may be truncated, so we need to return the total_rows.
228    #[serde(default)]
229    total_rows: usize,
230
231    // plan level execution metrics
232    #[serde(skip_serializing_if = "HashMap::is_empty")]
233    #[serde(default)]
234    metrics: HashMap<String, Value>,
235}
236
237impl HttpRecordsOutput {
238    pub fn num_rows(&self) -> usize {
239        self.rows.len()
240    }
241
242    pub fn num_cols(&self) -> usize {
243        self.schema.column_schemas.len()
244    }
245
246    pub fn schema(&self) -> &OutputSchema {
247        &self.schema
248    }
249
250    pub fn rows(&self) -> &Vec<Vec<Value>> {
251        &self.rows
252    }
253}
254
255impl HttpRecordsOutput {
256    pub fn try_new(
257        schema: SchemaRef,
258        recordbatches: Vec<RecordBatch>,
259    ) -> std::result::Result<HttpRecordsOutput, Error> {
260        if recordbatches.is_empty() {
261            Ok(HttpRecordsOutput {
262                schema: OutputSchema::from(schema),
263                rows: vec![],
264                total_rows: 0,
265                metrics: Default::default(),
266            })
267        } else {
268            let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
269            let mut rows = Vec::with_capacity(num_rows);
270            let schemas = schema.column_schemas();
271            let num_cols = schema.column_schemas().len();
272            rows.resize_with(num_rows, || Vec::with_capacity(num_cols));
273
274            let mut finished_row_cursor = 0;
275            for recordbatch in recordbatches {
276                for (col_idx, col) in recordbatch.columns().iter().enumerate() {
277                    // safety here: schemas length is equal to the number of columns in the recordbatch
278                    let schema = &schemas[col_idx];
279                    for row_idx in 0..recordbatch.num_rows() {
280                        let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema)
281                            .context(ToJsonSnafu)?;
282                        rows[row_idx + finished_row_cursor].push(value);
283                    }
284                }
285                finished_row_cursor += recordbatch.num_rows();
286            }
287
288            Ok(HttpRecordsOutput {
289                schema: OutputSchema::from(schema),
290                total_rows: rows.len(),
291                rows,
292                metrics: Default::default(),
293            })
294        }
295    }
296}
297
298#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
299#[serde(rename_all = "lowercase")]
300pub enum GreptimeQueryOutput {
301    AffectedRows(usize),
302    Records(HttpRecordsOutput),
303}
304
305/// It allows the results of SQL queries to be presented in different formats.
306#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
307pub enum ResponseFormat {
308    Arrow,
309    Csv,
310    Table,
311    #[default]
312    GreptimedbV1,
313    InfluxdbV1,
314    Json,
315}
316
317impl ResponseFormat {
318    pub fn parse(s: &str) -> Option<Self> {
319        match s {
320            "arrow" => Some(ResponseFormat::Arrow),
321            "csv" => Some(ResponseFormat::Csv),
322            "table" => Some(ResponseFormat::Table),
323            "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
324            "influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
325            "json" => Some(ResponseFormat::Json),
326            _ => None,
327        }
328    }
329
330    pub fn as_str(&self) -> &'static str {
331        match self {
332            ResponseFormat::Arrow => "arrow",
333            ResponseFormat::Csv => "csv",
334            ResponseFormat::Table => "table",
335            ResponseFormat::GreptimedbV1 => "greptimedb_v1",
336            ResponseFormat::InfluxdbV1 => "influxdb_v1",
337            ResponseFormat::Json => "json",
338        }
339    }
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum Epoch {
344    Nanosecond,
345    Microsecond,
346    Millisecond,
347    Second,
348}
349
350impl Epoch {
351    pub fn parse(s: &str) -> Option<Epoch> {
352        // Both u and µ indicate microseconds.
353        // epoch = [ns,u,µ,ms,s],
354        // For details, see the Influxdb documents.
355        // https://docs.influxdata.com/influxdb/v1/tools/api/#query-string-parameters-1
356        match s {
357            "ns" => Some(Epoch::Nanosecond),
358            "u" | "µ" => Some(Epoch::Microsecond),
359            "ms" => Some(Epoch::Millisecond),
360            "s" => Some(Epoch::Second),
361            _ => None, // just returns None for other cases
362        }
363    }
364
365    pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
366        match self {
367            Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
368            Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
369            Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
370            Epoch::Second => ts.convert_to(TimeUnit::Second),
371        }
372    }
373}
374
375impl Display for Epoch {
376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377        match self {
378            Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
379            Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
380            Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
381            Epoch::Second => write!(f, "Epoch::Second"),
382        }
383    }
384}
385
386#[derive(Serialize, Deserialize, Debug)]
387pub enum HttpResponse {
388    Arrow(ArrowResponse),
389    Csv(CsvResponse),
390    Table(TableResponse),
391    Error(ErrorResponse),
392    GreptimedbV1(GreptimedbV1Response),
393    InfluxdbV1(InfluxdbV1Response),
394    Json(JsonResponse),
395}
396
397impl HttpResponse {
398    pub fn with_execution_time(self, execution_time: u64) -> Self {
399        match self {
400            HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
401            HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
402            HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
403            HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
404            HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
405            HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
406            HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
407        }
408    }
409
410    pub fn with_limit(self, limit: usize) -> Self {
411        match self {
412            HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
413            HttpResponse::Table(resp) => resp.with_limit(limit).into(),
414            HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
415            HttpResponse::Json(resp) => resp.with_limit(limit).into(),
416            _ => self,
417        }
418    }
419}
420
421pub fn process_with_limit(
422    mut outputs: Vec<GreptimeQueryOutput>,
423    limit: usize,
424) -> Vec<GreptimeQueryOutput> {
425    outputs
426        .drain(..)
427        .map(|data| match data {
428            GreptimeQueryOutput::Records(mut records) => {
429                if records.rows.len() > limit {
430                    records.rows.truncate(limit);
431                    records.total_rows = limit;
432                }
433                GreptimeQueryOutput::Records(records)
434            }
435            _ => data,
436        })
437        .collect()
438}
439
440impl IntoResponse for HttpResponse {
441    fn into_response(self) -> Response {
442        match self {
443            HttpResponse::Arrow(resp) => resp.into_response(),
444            HttpResponse::Csv(resp) => resp.into_response(),
445            HttpResponse::Table(resp) => resp.into_response(),
446            HttpResponse::GreptimedbV1(resp) => resp.into_response(),
447            HttpResponse::InfluxdbV1(resp) => resp.into_response(),
448            HttpResponse::Json(resp) => resp.into_response(),
449            HttpResponse::Error(resp) => resp.into_response(),
450        }
451    }
452}
453
454impl From<ArrowResponse> for HttpResponse {
455    fn from(value: ArrowResponse) -> Self {
456        HttpResponse::Arrow(value)
457    }
458}
459
460impl From<CsvResponse> for HttpResponse {
461    fn from(value: CsvResponse) -> Self {
462        HttpResponse::Csv(value)
463    }
464}
465
466impl From<TableResponse> for HttpResponse {
467    fn from(value: TableResponse) -> Self {
468        HttpResponse::Table(value)
469    }
470}
471
472impl From<ErrorResponse> for HttpResponse {
473    fn from(value: ErrorResponse) -> Self {
474        HttpResponse::Error(value)
475    }
476}
477
478impl From<GreptimedbV1Response> for HttpResponse {
479    fn from(value: GreptimedbV1Response) -> Self {
480        HttpResponse::GreptimedbV1(value)
481    }
482}
483
484impl From<InfluxdbV1Response> for HttpResponse {
485    fn from(value: InfluxdbV1Response) -> Self {
486        HttpResponse::InfluxdbV1(value)
487    }
488}
489
490impl From<JsonResponse> for HttpResponse {
491    fn from(value: JsonResponse) -> Self {
492        HttpResponse::Json(value)
493    }
494}
495
496#[derive(Clone)]
497pub struct ApiState {
498    pub sql_handler: ServerSqlQueryHandlerRef,
499}
500
501#[derive(Clone)]
502pub struct GreptimeOptionsConfigState {
503    pub greptime_config_options: String,
504}
505
506#[derive(Default)]
507pub struct HttpServerBuilder {
508    options: HttpOptions,
509    plugins: Plugins,
510    user_provider: Option<UserProviderRef>,
511    router: Router,
512}
513
514impl HttpServerBuilder {
515    pub fn new(options: HttpOptions) -> Self {
516        Self {
517            options,
518            plugins: Plugins::default(),
519            user_provider: None,
520            router: Router::new(),
521        }
522    }
523
524    pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
525        let sql_router = HttpServer::route_sql(ApiState { sql_handler });
526
527        Self {
528            router: self
529                .router
530                .nest(&format!("/{HTTP_API_VERSION}"), sql_router),
531            ..self
532        }
533    }
534
535    pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
536        let logs_router = HttpServer::route_logs(logs_handler);
537
538        Self {
539            router: self
540                .router
541                .nest(&format!("/{HTTP_API_VERSION}"), logs_router),
542            ..self
543        }
544    }
545
546    pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
547        Self {
548            router: self.router.nest(
549                &format!("/{HTTP_API_VERSION}/opentsdb"),
550                HttpServer::route_opentsdb(handler),
551            ),
552            ..self
553        }
554    }
555
556    pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
557        Self {
558            router: self.router.nest(
559                &format!("/{HTTP_API_VERSION}/influxdb"),
560                HttpServer::route_influxdb(handler),
561            ),
562            ..self
563        }
564    }
565
566    pub fn with_prom_handler(
567        self,
568        handler: PromStoreProtocolHandlerRef,
569        pipeline_handler: Option<PipelineHandlerRef>,
570        prom_store_with_metric_engine: bool,
571        prom_validation_mode: PromValidationMode,
572    ) -> Self {
573        let state = PromStoreState {
574            prom_store_handler: handler,
575            pipeline_handler,
576            prom_store_with_metric_engine,
577            prom_validation_mode,
578        };
579
580        Self {
581            router: self.router.nest(
582                &format!("/{HTTP_API_VERSION}/prometheus"),
583                HttpServer::route_prom(state),
584            ),
585            ..self
586        }
587    }
588
589    pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
590        Self {
591            router: self.router.nest(
592                &format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
593                HttpServer::route_prometheus(handler),
594            ),
595            ..self
596        }
597    }
598
599    pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
600        Self {
601            router: self.router.nest(
602                &format!("/{HTTP_API_VERSION}/otlp"),
603                HttpServer::route_otlp(handler),
604            ),
605            ..self
606        }
607    }
608
609    pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
610        Self {
611            user_provider: Some(user_provider),
612            ..self
613        }
614    }
615
616    pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
617        Self {
618            router: self.router.merge(HttpServer::route_metrics(handler)),
619            ..self
620        }
621    }
622
623    pub fn with_log_ingest_handler(
624        self,
625        handler: PipelineHandlerRef,
626        validator: Option<LogValidatorRef>,
627        ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
628    ) -> Self {
629        let log_state = LogState {
630            log_handler: handler,
631            log_validator: validator,
632            ingest_interceptor,
633        };
634
635        let router = self.router.nest(
636            &format!("/{HTTP_API_VERSION}"),
637            HttpServer::route_pipelines(log_state.clone()),
638        );
639        // deprecated since v0.11.0. Use `/logs` and `/pipelines` instead.
640        let router = router.nest(
641            &format!("/{HTTP_API_VERSION}/events"),
642            #[allow(deprecated)]
643            HttpServer::route_log_deprecated(log_state.clone()),
644        );
645
646        let router = router.nest(
647            &format!("/{HTTP_API_VERSION}/loki"),
648            HttpServer::route_loki(log_state.clone()),
649        );
650
651        let router = router.nest(
652            &format!("/{HTTP_API_VERSION}/elasticsearch"),
653            HttpServer::route_elasticsearch(log_state.clone()),
654        );
655
656        let router = router.nest(
657            &format!("/{HTTP_API_VERSION}/elasticsearch/"),
658            Router::new()
659                .route("/", routing::get(elasticsearch::handle_get_version))
660                .with_state(log_state),
661        );
662
663        Self { router, ..self }
664    }
665
666    pub fn with_plugins(self, plugins: Plugins) -> Self {
667        Self { plugins, ..self }
668    }
669
670    pub fn with_greptime_config_options(self, opts: String) -> Self {
671        let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
672            greptime_config_options: opts,
673        });
674
675        Self {
676            router: self.router.merge(config_router),
677            ..self
678        }
679    }
680
681    pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
682        Self {
683            router: self.router.nest(
684                &format!("/{HTTP_API_VERSION}/jaeger"),
685                HttpServer::route_jaeger(handler),
686            ),
687            ..self
688        }
689    }
690
691    pub fn with_extra_router(self, router: Router) -> Self {
692        Self {
693            router: self.router.merge(router),
694            ..self
695        }
696    }
697
698    pub fn build(self) -> HttpServer {
699        HttpServer {
700            options: self.options,
701            user_provider: self.user_provider,
702            shutdown_tx: Mutex::new(None),
703            plugins: self.plugins,
704            router: StdMutex::new(self.router),
705            bind_addr: None,
706        }
707    }
708}
709
710impl HttpServer {
711    /// Gets the router and adds necessary root routes (health, status, dashboard).
712    pub fn make_app(&self) -> Router {
713        let mut router = {
714            let router = self.router.lock().unwrap();
715            router.clone()
716        };
717
718        router = router
719            .route(
720                "/health",
721                routing::get(handler::health).post(handler::health),
722            )
723            .route(
724                "/ready",
725                routing::get(handler::health).post(handler::health),
726            );
727
728        router = router.route("/status", routing::get(handler::status));
729
730        #[cfg(feature = "dashboard")]
731        {
732            if !self.options.disable_dashboard {
733                info!("Enable dashboard service at '/dashboard'");
734                // redirect /dashboard to /dashboard/
735                router = router.route(
736                    "/dashboard",
737                    routing::get(|uri: axum::http::uri::Uri| async move {
738                        let path = uri.path();
739                        let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
740
741                        let new_uri = format!("{}/{}", path, query);
742                        axum::response::Redirect::permanent(&new_uri)
743                    }),
744                );
745
746                // "/dashboard" and "/dashboard/" are two different paths in Axum.
747                // We cannot nest "/dashboard/", because we already mapping "/dashboard/{*x}" while nesting "/dashboard".
748                // So we explicitly route "/dashboard/" here.
749                router = router
750                    .route(
751                        "/dashboard/",
752                        routing::get(dashboard::static_handler).post(dashboard::static_handler),
753                    )
754                    .route(
755                        "/dashboard/{*x}",
756                        routing::get(dashboard::static_handler).post(dashboard::static_handler),
757                    );
758            }
759        }
760
761        // Add a layer to collect HTTP metrics for axum.
762        router = router.route_layer(middleware::from_fn(http_metrics_layer));
763
764        router
765    }
766
767    /// Attaches middlewares and debug routes to the router.
768    /// Callers should call this method after [HttpServer::make_app()].
769    pub fn build(&self, router: Router) -> Result<Router> {
770        let timeout_layer = if self.options.timeout != Duration::default() {
771            Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
772        } else {
773            info!("HTTP server timeout is disabled");
774            None
775        };
776        let body_limit_layer = if self.options.body_limit != ReadableSize(0) {
777            Some(
778                ServiceBuilder::new()
779                    .layer(DefaultBodyLimit::max(self.options.body_limit.0 as usize)),
780            )
781        } else {
782            info!("HTTP server body limit is disabled");
783            None
784        };
785        let cors_layer = if self.options.enable_cors {
786            Some(
787                CorsLayer::new()
788                    .allow_methods([
789                        Method::GET,
790                        Method::POST,
791                        Method::PUT,
792                        Method::DELETE,
793                        Method::HEAD,
794                    ])
795                    .allow_origin(if self.options.cors_allowed_origins.is_empty() {
796                        AllowOrigin::from(Any)
797                    } else {
798                        AllowOrigin::from(
799                            self.options
800                                .cors_allowed_origins
801                                .iter()
802                                .map(|s| {
803                                    HeaderValue::from_str(s.as_str())
804                                        .context(InvalidHeaderValueSnafu)
805                                })
806                                .collect::<Result<Vec<HeaderValue>>>()?,
807                        )
808                    })
809                    .allow_headers(Any),
810            )
811        } else {
812            info!("HTTP server cross-origin is disabled");
813            None
814        };
815
816        Ok(router
817            // middlewares
818            .layer(
819                ServiceBuilder::new()
820                    // disable on failure tracing. because printing out isn't very helpful,
821                    // and we have impl IntoResponse for Error. It will print out more detailed error messages
822                    .layer(TraceLayer::new_for_http().on_failure(()))
823                    .option_layer(cors_layer)
824                    .option_layer(timeout_layer)
825                    .option_layer(body_limit_layer)
826                    // auth layer
827                    .layer(middleware::from_fn_with_state(
828                        AuthState::new(self.user_provider.clone()),
829                        authorize::check_http_auth,
830                    ))
831                    .layer(middleware::from_fn(hints::extract_hints))
832                    .layer(middleware::from_fn(
833                        read_preference::extract_read_preference,
834                    )),
835            )
836            // Handlers for debug, we don't expect a timeout.
837            .nest(
838                "/debug",
839                Router::new()
840                    // handler for changing log level dynamically
841                    .route("/log_level", routing::post(dyn_log::dyn_log_handler))
842                    .nest(
843                        "/prof",
844                        Router::new()
845                            .route("/cpu", routing::post(pprof::pprof_handler))
846                            .route("/mem", routing::post(mem_prof::mem_prof_handler)),
847                    ),
848            ))
849    }
850
851    fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
852        Router::new()
853            .route("/metrics", routing::get(handler::metrics))
854            .with_state(metrics_handler)
855    }
856
857    fn route_loki<S>(log_state: LogState) -> Router<S> {
858        Router::new()
859            .route("/api/v1/push", routing::post(loki::loki_ingest))
860            .layer(
861                ServiceBuilder::new()
862                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
863            )
864            .with_state(log_state)
865    }
866
867    fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
868        Router::new()
869            // Return fake responsefor HEAD '/' request.
870            .route(
871                "/",
872                routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
873            )
874            // Return fake response for Elasticsearch version request.
875            .route("/", routing::get(elasticsearch::handle_get_version))
876            // Return fake response for Elasticsearch license request.
877            .route("/_license", routing::get(elasticsearch::handle_get_license))
878            .route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
879            .route(
880                "/{index}/_bulk",
881                routing::post(elasticsearch::handle_bulk_api_with_index),
882            )
883            // Return fake response for Elasticsearch ilm request.
884            .route(
885                "/_ilm/policy/{*path}",
886                routing::any((
887                    HttpStatusCode::OK,
888                    elasticsearch::elasticsearch_headers(),
889                    axum::Json(serde_json::json!({})),
890                )),
891            )
892            // Return fake response for Elasticsearch index template request.
893            .route(
894                "/_index_template/{*path}",
895                routing::any((
896                    HttpStatusCode::OK,
897                    elasticsearch::elasticsearch_headers(),
898                    axum::Json(serde_json::json!({})),
899                )),
900            )
901            // Return fake response for Elasticsearch ingest pipeline request.
902            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html.
903            .route(
904                "/_ingest/{*path}",
905                routing::any((
906                    HttpStatusCode::OK,
907                    elasticsearch::elasticsearch_headers(),
908                    axum::Json(serde_json::json!({})),
909                )),
910            )
911            // Return fake response for Elasticsearch nodes discovery request.
912            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html.
913            .route(
914                "/_nodes/{*path}",
915                routing::any((
916                    HttpStatusCode::OK,
917                    elasticsearch::elasticsearch_headers(),
918                    axum::Json(serde_json::json!({})),
919                )),
920            )
921            // Return fake response for Logstash APIs requests.
922            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/logstash-apis.html
923            .route(
924                "/logstash/{*path}",
925                routing::any((
926                    HttpStatusCode::OK,
927                    elasticsearch::elasticsearch_headers(),
928                    axum::Json(serde_json::json!({})),
929                )),
930            )
931            .route(
932                "/_logstash/{*path}",
933                routing::any((
934                    HttpStatusCode::OK,
935                    elasticsearch::elasticsearch_headers(),
936                    axum::Json(serde_json::json!({})),
937                )),
938            )
939            .layer(ServiceBuilder::new().layer(RequestDecompressionLayer::new()))
940            .with_state(log_state)
941    }
942
943    #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
944    fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
945        Router::new()
946            .route("/logs", routing::post(event::log_ingester))
947            .route(
948                "/pipelines/{pipeline_name}",
949                routing::get(event::query_pipeline),
950            )
951            .route(
952                "/pipelines/{pipeline_name}",
953                routing::post(event::add_pipeline),
954            )
955            .route(
956                "/pipelines/{pipeline_name}",
957                routing::delete(event::delete_pipeline),
958            )
959            .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
960            .layer(
961                ServiceBuilder::new()
962                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
963            )
964            .with_state(log_state)
965    }
966
967    fn route_pipelines<S>(log_state: LogState) -> Router<S> {
968        Router::new()
969            .route("/ingest", routing::post(event::log_ingester))
970            .route(
971                "/pipelines/{pipeline_name}",
972                routing::get(event::query_pipeline),
973            )
974            .route(
975                "/pipelines/{pipeline_name}",
976                routing::post(event::add_pipeline),
977            )
978            .route(
979                "/pipelines/{pipeline_name}",
980                routing::delete(event::delete_pipeline),
981            )
982            .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
983            .layer(
984                ServiceBuilder::new()
985                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
986            )
987            .with_state(log_state)
988    }
989
990    fn route_sql<S>(api_state: ApiState) -> Router<S> {
991        Router::new()
992            .route("/sql", routing::get(handler::sql).post(handler::sql))
993            .route(
994                "/sql/parse",
995                routing::get(handler::sql_parse).post(handler::sql_parse),
996            )
997            .route(
998                "/promql",
999                routing::get(handler::promql).post(handler::promql),
1000            )
1001            .with_state(api_state)
1002    }
1003
1004    fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
1005        Router::new()
1006            .route("/logs", routing::get(logs::logs).post(logs::logs))
1007            .with_state(log_handler)
1008    }
1009
1010    /// Route Prometheus [HTTP API].
1011    ///
1012    /// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
1013    fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
1014        Router::new()
1015            .route(
1016                "/format_query",
1017                routing::post(format_query).get(format_query),
1018            )
1019            .route("/status/buildinfo", routing::get(build_info_query))
1020            .route("/query", routing::post(instant_query).get(instant_query))
1021            .route("/query_range", routing::post(range_query).get(range_query))
1022            .route("/labels", routing::post(labels_query).get(labels_query))
1023            .route("/series", routing::post(series_query).get(series_query))
1024            .route("/parse_query", routing::post(parse_query).get(parse_query))
1025            .route(
1026                "/label/{label_name}/values",
1027                routing::get(label_values_query),
1028            )
1029            .layer(ServiceBuilder::new().layer(CompressionLayer::new()))
1030            .with_state(prometheus_handler)
1031    }
1032
1033    /// Route Prometheus remote [read] and [write] API. In other places the related modules are
1034    /// called `prom_store`.
1035    ///
1036    /// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
1037    /// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
1038    fn route_prom<S>(state: PromStoreState) -> Router<S> {
1039        Router::new()
1040            .route("/read", routing::post(prom_store::remote_read))
1041            .route("/write", routing::post(prom_store::remote_write))
1042            .with_state(state)
1043    }
1044
1045    fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
1046        Router::new()
1047            .route("/write", routing::post(influxdb_write_v1))
1048            .route("/api/v2/write", routing::post(influxdb_write_v2))
1049            .layer(
1050                ServiceBuilder::new()
1051                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1052            )
1053            .route("/ping", routing::get(influxdb_ping))
1054            .route("/health", routing::get(influxdb_health))
1055            .with_state(influxdb_handler)
1056    }
1057
1058    fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
1059        Router::new()
1060            .route("/api/put", routing::post(opentsdb::put))
1061            .with_state(opentsdb_handler)
1062    }
1063
1064    fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
1065        Router::new()
1066            .route("/v1/metrics", routing::post(otlp::metrics))
1067            .route("/v1/traces", routing::post(otlp::traces))
1068            .route("/v1/logs", routing::post(otlp::logs))
1069            .layer(
1070                ServiceBuilder::new()
1071                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1072            )
1073            .with_state(otlp_handler)
1074    }
1075
1076    fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
1077        Router::new()
1078            .route("/config", routing::get(handler::config))
1079            .with_state(state)
1080    }
1081
1082    fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
1083        Router::new()
1084            .route("/api/services", routing::get(jaeger::handle_get_services))
1085            .route(
1086                "/api/services/{service_name}/operations",
1087                routing::get(jaeger::handle_get_operations_by_service),
1088            )
1089            .route(
1090                "/api/operations",
1091                routing::get(jaeger::handle_get_operations),
1092            )
1093            .route("/api/traces", routing::get(jaeger::handle_find_traces))
1094            .route(
1095                "/api/traces/{trace_id}",
1096                routing::get(jaeger::handle_get_trace),
1097            )
1098            .with_state(handler)
1099    }
1100}
1101
1102pub const HTTP_SERVER: &str = "HTTP_SERVER";
1103
1104#[async_trait]
1105impl Server for HttpServer {
1106    async fn shutdown(&self) -> Result<()> {
1107        let mut shutdown_tx = self.shutdown_tx.lock().await;
1108        if let Some(tx) = shutdown_tx.take() {
1109            if tx.send(()).is_err() {
1110                info!("Receiver dropped, the HTTP server has already existed");
1111            }
1112        }
1113        info!("Shutdown HTTP server");
1114
1115        Ok(())
1116    }
1117
1118    async fn start(&mut self, listening: SocketAddr) -> Result<()> {
1119        let (tx, rx) = oneshot::channel();
1120        let serve = {
1121            let mut shutdown_tx = self.shutdown_tx.lock().await;
1122            ensure!(
1123                shutdown_tx.is_none(),
1124                AlreadyStartedSnafu { server: "HTTP" }
1125            );
1126
1127            let mut app = self.make_app();
1128            if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
1129                app = configurator.config_http(app);
1130            }
1131            let app = self.build(app)?;
1132            let listener = tokio::net::TcpListener::bind(listening)
1133                .await
1134                .context(AddressBindSnafu { addr: listening })?
1135                .tap_io(|tcp_stream| {
1136                    if let Err(e) = tcp_stream.set_nodelay(true) {
1137                        error!(e; "Failed to set TCP_NODELAY on incoming connection");
1138                    }
1139                });
1140            let serve = axum::serve(listener, app.into_make_service());
1141
1142            // FIXME(yingwen): Support keepalive.
1143            // See:
1144            // - https://github.com/tokio-rs/axum/discussions/2939
1145            // - https://stackoverflow.com/questions/73069718/how-do-i-keep-alive-tokiotcpstream-in-rust
1146            // let server = axum::Server::try_bind(&listening)
1147            //     .with_context(|_| AddressBindSnafu { addr: listening })?
1148            //     .tcp_nodelay(true)
1149            //     // Enable TCP keepalive to close the dangling established connections.
1150            //     // It's configured to let the keepalive probes first send after the connection sits
1151            //     // idle for 59 minutes, and then send every 10 seconds for 6 times.
1152            //     // So the connection will be closed after roughly 1 hour.
1153            //     .tcp_keepalive(Some(Duration::from_secs(59 * 60)))
1154            //     .tcp_keepalive_interval(Some(Duration::from_secs(10)))
1155            //     .tcp_keepalive_retries(Some(6))
1156            //     .serve(app.into_make_service());
1157
1158            *shutdown_tx = Some(tx);
1159
1160            serve
1161        };
1162        let listening = serve.local_addr().context(InternalIoSnafu)?;
1163        info!("HTTP server is bound to {}", listening);
1164
1165        common_runtime::spawn_global(async move {
1166            if let Err(e) = serve
1167                .with_graceful_shutdown(rx.map(drop))
1168                .await
1169                .context(InternalIoSnafu)
1170            {
1171                error!(e; "Failed to shutdown http server");
1172            }
1173        });
1174
1175        self.bind_addr = Some(listening);
1176        Ok(())
1177    }
1178
1179    fn name(&self) -> &str {
1180        HTTP_SERVER
1181    }
1182
1183    fn bind_addr(&self) -> Option<SocketAddr> {
1184        self.bind_addr
1185    }
1186}
1187
1188#[cfg(test)]
1189mod test {
1190    use std::future::pending;
1191    use std::io::Cursor;
1192    use std::sync::Arc;
1193
1194    use arrow_ipc::reader::FileReader;
1195    use arrow_schema::DataType;
1196    use axum::handler::Handler;
1197    use axum::http::StatusCode;
1198    use axum::routing::get;
1199    use common_query::Output;
1200    use common_recordbatch::RecordBatches;
1201    use datafusion_expr::LogicalPlan;
1202    use datatypes::prelude::*;
1203    use datatypes::schema::{ColumnSchema, Schema};
1204    use datatypes::vectors::{StringVector, UInt32Vector};
1205    use header::constants::GREPTIME_DB_HEADER_TIMEOUT;
1206    use query::parser::PromQuery;
1207    use query::query_engine::DescribeResult;
1208    use session::context::QueryContextRef;
1209    use tokio::sync::mpsc;
1210    use tokio::time::Instant;
1211
1212    use super::*;
1213    use crate::error::Error;
1214    use crate::http::test_helpers::TestClient;
1215    use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
1216
1217    struct DummyInstance {
1218        _tx: mpsc::Sender<(String, Vec<u8>)>,
1219    }
1220
1221    #[async_trait]
1222    impl SqlQueryHandler for DummyInstance {
1223        type Error = Error;
1224
1225        async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
1226            unimplemented!()
1227        }
1228
1229        async fn do_promql_query(
1230            &self,
1231            _: &PromQuery,
1232            _: QueryContextRef,
1233        ) -> Vec<std::result::Result<Output, Self::Error>> {
1234            unimplemented!()
1235        }
1236
1237        async fn do_exec_plan(
1238            &self,
1239            _plan: LogicalPlan,
1240            _query_ctx: QueryContextRef,
1241        ) -> std::result::Result<Output, Self::Error> {
1242            unimplemented!()
1243        }
1244
1245        async fn do_describe(
1246            &self,
1247            _stmt: sql::statements::statement::Statement,
1248            _query_ctx: QueryContextRef,
1249        ) -> Result<Option<DescribeResult>> {
1250            unimplemented!()
1251        }
1252
1253        async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
1254            Ok(true)
1255        }
1256    }
1257
1258    fn timeout() -> DynamicTimeoutLayer {
1259        DynamicTimeoutLayer::new(Duration::from_millis(10))
1260    }
1261
1262    async fn forever() {
1263        pending().await
1264    }
1265
1266    fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
1267        make_test_app_custom(tx, HttpOptions::default())
1268    }
1269
1270    fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
1271        let instance = Arc::new(DummyInstance { _tx: tx });
1272        let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
1273        let server = HttpServerBuilder::new(options)
1274            .with_sql_handler(sql_instance)
1275            .build();
1276        server.build(server.make_app()).unwrap().route(
1277            "/test/timeout",
1278            get(forever.layer(ServiceBuilder::new().layer(timeout()))),
1279        )
1280    }
1281
1282    #[tokio::test]
1283    pub async fn test_cors() {
1284        // cors is on by default
1285        let (tx, _rx) = mpsc::channel(100);
1286        let app = make_test_app(tx);
1287        let client = TestClient::new(app).await;
1288
1289        let res = client.get("/health").send().await;
1290
1291        assert_eq!(res.status(), StatusCode::OK);
1292        assert_eq!(
1293            res.headers()
1294                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1295                .expect("expect cors header origin"),
1296            "*"
1297        );
1298
1299        let res = client
1300            .options("/health")
1301            .header("Access-Control-Request-Headers", "x-greptime-auth")
1302            .header("Access-Control-Request-Method", "DELETE")
1303            .header("Origin", "https://example.com")
1304            .send()
1305            .await;
1306        assert_eq!(res.status(), StatusCode::OK);
1307        assert_eq!(
1308            res.headers()
1309                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1310                .expect("expect cors header origin"),
1311            "*"
1312        );
1313        assert_eq!(
1314            res.headers()
1315                .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
1316                .expect("expect cors header headers"),
1317            "*"
1318        );
1319        assert_eq!(
1320            res.headers()
1321                .get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
1322                .expect("expect cors header methods"),
1323            "GET,POST,PUT,DELETE,HEAD"
1324        );
1325    }
1326
1327    #[tokio::test]
1328    pub async fn test_cors_custom_origins() {
1329        // cors is on by default
1330        let (tx, _rx) = mpsc::channel(100);
1331        let origin = "https://example.com";
1332
1333        let options = HttpOptions {
1334            cors_allowed_origins: vec![origin.to_string()],
1335            ..Default::default()
1336        };
1337
1338        let app = make_test_app_custom(tx, options);
1339        let client = TestClient::new(app).await;
1340
1341        let res = client.get("/health").header("Origin", origin).send().await;
1342
1343        assert_eq!(res.status(), StatusCode::OK);
1344        assert_eq!(
1345            res.headers()
1346                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1347                .expect("expect cors header origin"),
1348            origin
1349        );
1350
1351        let res = client
1352            .get("/health")
1353            .header("Origin", "https://notallowed.com")
1354            .send()
1355            .await;
1356
1357        assert_eq!(res.status(), StatusCode::OK);
1358        assert!(!res
1359            .headers()
1360            .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1361    }
1362
1363    #[tokio::test]
1364    pub async fn test_cors_disabled() {
1365        // cors is on by default
1366        let (tx, _rx) = mpsc::channel(100);
1367
1368        let options = HttpOptions {
1369            enable_cors: false,
1370            ..Default::default()
1371        };
1372
1373        let app = make_test_app_custom(tx, options);
1374        let client = TestClient::new(app).await;
1375
1376        let res = client.get("/health").send().await;
1377
1378        assert_eq!(res.status(), StatusCode::OK);
1379        assert!(!res
1380            .headers()
1381            .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1382    }
1383
1384    #[test]
1385    fn test_http_options_default() {
1386        let default = HttpOptions::default();
1387        assert_eq!("127.0.0.1:4000".to_string(), default.addr);
1388        assert_eq!(Duration::from_secs(0), default.timeout)
1389    }
1390
1391    #[tokio::test]
1392    async fn test_http_server_request_timeout() {
1393        common_telemetry::init_default_ut_logging();
1394
1395        let (tx, _rx) = mpsc::channel(100);
1396        let app = make_test_app(tx);
1397        let client = TestClient::new(app).await;
1398        let res = client.get("/test/timeout").send().await;
1399        assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1400
1401        let now = Instant::now();
1402        let res = client
1403            .get("/test/timeout")
1404            .header(GREPTIME_DB_HEADER_TIMEOUT, "20ms")
1405            .send()
1406            .await;
1407        assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1408        let elapsed = now.elapsed();
1409        assert!(elapsed > Duration::from_millis(15));
1410
1411        tokio::time::timeout(
1412            Duration::from_millis(15),
1413            client
1414                .get("/test/timeout")
1415                .header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
1416                .send(),
1417        )
1418        .await
1419        .unwrap_err();
1420
1421        tokio::time::timeout(
1422            Duration::from_millis(15),
1423            client
1424                .get("/test/timeout")
1425                .header(
1426                    GREPTIME_DB_HEADER_TIMEOUT,
1427                    humantime::format_duration(Duration::default()).to_string(),
1428                )
1429                .send(),
1430        )
1431        .await
1432        .unwrap_err();
1433    }
1434
1435    #[tokio::test]
1436    async fn test_schema_for_empty_response() {
1437        let column_schemas = vec![
1438            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1439            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1440        ];
1441        let schema = Arc::new(Schema::new(column_schemas));
1442
1443        let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
1444        let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1445
1446        let json_resp = GreptimedbV1Response::from_output(outputs).await;
1447        if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
1448            let json_output = &json_resp.output[0];
1449            if let GreptimeQueryOutput::Records(r) = json_output {
1450                assert_eq!(r.num_rows(), 0);
1451                assert_eq!(r.num_cols(), 2);
1452                assert_eq!(r.schema.column_schemas[0].name, "numbers");
1453                assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1454            } else {
1455                panic!("invalid output type");
1456            }
1457        } else {
1458            panic!("invalid format")
1459        }
1460    }
1461
1462    #[tokio::test]
1463    async fn test_recordbatches_conversion() {
1464        let column_schemas = vec![
1465            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1466            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1467        ];
1468        let schema = Arc::new(Schema::new(column_schemas));
1469        let columns: Vec<VectorRef> = vec![
1470            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
1471            Arc::new(StringVector::from(vec![
1472                None,
1473                Some("hello"),
1474                Some("greptime"),
1475                None,
1476            ])),
1477        ];
1478        let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
1479
1480        for format in [
1481            ResponseFormat::GreptimedbV1,
1482            ResponseFormat::InfluxdbV1,
1483            ResponseFormat::Csv,
1484            ResponseFormat::Table,
1485            ResponseFormat::Arrow,
1486            ResponseFormat::Json,
1487        ] {
1488            let recordbatches =
1489                RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
1490            let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1491            let json_resp = match format {
1492                ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
1493                ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
1494                ResponseFormat::Table => TableResponse::from_output(outputs).await,
1495                ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
1496                ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
1497                ResponseFormat::Json => JsonResponse::from_output(outputs).await,
1498            };
1499
1500            match json_resp {
1501                HttpResponse::GreptimedbV1(resp) => {
1502                    let json_output = &resp.output[0];
1503                    if let GreptimeQueryOutput::Records(r) = json_output {
1504                        assert_eq!(r.num_rows(), 4);
1505                        assert_eq!(r.num_cols(), 2);
1506                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1507                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1508                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1509                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1510                    } else {
1511                        panic!("invalid output type");
1512                    }
1513                }
1514                HttpResponse::InfluxdbV1(resp) => {
1515                    let json_output = &resp.results()[0];
1516                    assert_eq!(json_output.num_rows(), 4);
1517                    assert_eq!(json_output.num_cols(), 2);
1518                    assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
1519                    assert_eq!(
1520                        json_output.series[0].values[0][0],
1521                        serde_json::Value::from(1)
1522                    );
1523                    assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
1524                }
1525                HttpResponse::Csv(resp) => {
1526                    let output = &resp.output()[0];
1527                    if let GreptimeQueryOutput::Records(r) = output {
1528                        assert_eq!(r.num_rows(), 4);
1529                        assert_eq!(r.num_cols(), 2);
1530                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1531                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1532                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1533                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1534                    } else {
1535                        panic!("invalid output type");
1536                    }
1537                }
1538
1539                HttpResponse::Table(resp) => {
1540                    let output = &resp.output()[0];
1541                    if let GreptimeQueryOutput::Records(r) = output {
1542                        assert_eq!(r.num_rows(), 4);
1543                        assert_eq!(r.num_cols(), 2);
1544                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1545                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1546                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1547                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1548                    } else {
1549                        panic!("invalid output type");
1550                    }
1551                }
1552
1553                HttpResponse::Arrow(resp) => {
1554                    let output = resp.data;
1555                    let mut reader =
1556                        FileReader::try_new(Cursor::new(output), None).expect("Arrow reader error");
1557                    let schema = reader.schema();
1558                    assert_eq!(schema.fields[0].name(), "numbers");
1559                    assert_eq!(schema.fields[0].data_type(), &DataType::UInt32);
1560                    assert_eq!(schema.fields[1].name(), "strings");
1561                    assert_eq!(schema.fields[1].data_type(), &DataType::Utf8);
1562
1563                    let rb = reader.next().unwrap().expect("read record batch failed");
1564                    assert_eq!(rb.num_columns(), 2);
1565                    assert_eq!(rb.num_rows(), 4);
1566                }
1567
1568                HttpResponse::Json(resp) => {
1569                    let output = &resp.output()[0];
1570                    if let GreptimeQueryOutput::Records(r) = output {
1571                        assert_eq!(r.num_rows(), 4);
1572                        assert_eq!(r.num_cols(), 2);
1573                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1574                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1575                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1576                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1577                    } else {
1578                        panic!("invalid output type");
1579                    }
1580                }
1581
1582                HttpResponse::Error(err) => unreachable!("{err:?}"),
1583            }
1584        }
1585    }
1586}