servers/
http.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17use std::net::SocketAddr;
18use std::sync::Mutex as StdMutex;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use auth::UserProviderRef;
23use axum::extract::DefaultBodyLimit;
24use axum::http::StatusCode as HttpStatusCode;
25use axum::response::{IntoResponse, Response};
26use axum::serve::ListenerExt;
27use axum::{middleware, routing, Router};
28use common_base::readable_size::ReadableSize;
29use common_base::Plugins;
30use common_recordbatch::RecordBatch;
31use common_telemetry::{error, info};
32use common_time::timestamp::TimeUnit;
33use common_time::Timestamp;
34use datatypes::data_type::DataType;
35use datatypes::schema::SchemaRef;
36use datatypes::value::transform_value_ref_to_json_value;
37use event::{LogState, LogValidatorRef};
38use futures::FutureExt;
39use http::{HeaderValue, Method};
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use snafu::{ensure, ResultExt};
43use tokio::sync::oneshot::{self, Sender};
44use tokio::sync::Mutex;
45use tower::ServiceBuilder;
46use tower_http::compression::CompressionLayer;
47use tower_http::cors::{AllowOrigin, Any, CorsLayer};
48use tower_http::decompression::RequestDecompressionLayer;
49use tower_http::trace::TraceLayer;
50
51use self::authorize::AuthState;
52use self::result::table_result::TableResponse;
53use crate::configurator::ConfiguratorRef;
54use crate::elasticsearch;
55use crate::error::{
56    AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
57    ToJsonSnafu,
58};
59use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
60use crate::http::prom_store::PromStoreState;
61use crate::http::prometheus::{
62    build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
63    range_query, series_query,
64};
65use crate::http::result::arrow_result::ArrowResponse;
66use crate::http::result::csv_result::CsvResponse;
67use crate::http::result::error_result::ErrorResponse;
68use crate::http::result::greptime_result_v1::GreptimedbV1Response;
69use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
70use crate::http::result::json_result::JsonResponse;
71use crate::interceptor::LogIngestInterceptorRef;
72use crate::metrics::http_metrics_layer;
73use crate::metrics_handler::MetricsHandler;
74use crate::prometheus_handler::PrometheusHandlerRef;
75use crate::query_handler::sql::ServerSqlQueryHandlerRef;
76use crate::query_handler::{
77    InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
78    OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
79    PromStoreProtocolHandlerRef,
80};
81use crate::server::Server;
82
83pub mod authorize;
84#[cfg(feature = "dashboard")]
85mod dashboard;
86pub mod dyn_log;
87pub mod event;
88mod extractor;
89pub mod handler;
90pub mod header;
91pub mod influxdb;
92pub mod jaeger;
93pub mod logs;
94pub mod loki;
95pub mod mem_prof;
96pub mod opentsdb;
97pub mod otlp;
98pub mod pprof;
99pub mod prom_store;
100pub mod prometheus;
101pub mod result;
102mod timeout;
103
104pub(crate) use timeout::DynamicTimeoutLayer;
105
106mod hints;
107mod read_preference;
108#[cfg(any(test, feature = "testing"))]
109pub mod test_helpers;
110
111pub const HTTP_API_VERSION: &str = "v1";
112pub const HTTP_API_PREFIX: &str = "/v1/";
113/// Default http body limit (64M).
114const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
115
116/// Authorization header
117pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
118
119// TODO(fys): This is a temporary workaround, it will be improved later
120pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
121
122#[derive(Default)]
123pub struct HttpServer {
124    router: StdMutex<Router>,
125    shutdown_tx: Mutex<Option<Sender<()>>>,
126    user_provider: Option<UserProviderRef>,
127
128    // plugins
129    plugins: Plugins,
130
131    // server configs
132    options: HttpOptions,
133    bind_addr: Option<SocketAddr>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(default)]
138pub struct HttpOptions {
139    pub addr: String,
140
141    #[serde(with = "humantime_serde")]
142    pub timeout: Duration,
143
144    #[serde(skip)]
145    pub disable_dashboard: bool,
146
147    pub body_limit: ReadableSize,
148
149    pub is_strict_mode: bool,
150
151    pub cors_allowed_origins: Vec<String>,
152
153    pub enable_cors: bool,
154}
155
156impl Default for HttpOptions {
157    fn default() -> Self {
158        Self {
159            addr: "127.0.0.1:4000".to_string(),
160            timeout: Duration::from_secs(0),
161            disable_dashboard: false,
162            body_limit: DEFAULT_BODY_LIMIT,
163            is_strict_mode: false,
164            cors_allowed_origins: Vec::new(),
165            enable_cors: true,
166        }
167    }
168}
169
170#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
171pub struct ColumnSchema {
172    name: String,
173    data_type: String,
174}
175
176impl ColumnSchema {
177    pub fn new(name: String, data_type: String) -> ColumnSchema {
178        ColumnSchema { name, data_type }
179    }
180}
181
182#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
183pub struct OutputSchema {
184    column_schemas: Vec<ColumnSchema>,
185}
186
187impl OutputSchema {
188    pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
189        OutputSchema {
190            column_schemas: columns,
191        }
192    }
193}
194
195impl From<SchemaRef> for OutputSchema {
196    fn from(schema: SchemaRef) -> OutputSchema {
197        OutputSchema {
198            column_schemas: schema
199                .column_schemas()
200                .iter()
201                .map(|cs| ColumnSchema {
202                    name: cs.name.clone(),
203                    data_type: cs.data_type.name(),
204                })
205                .collect(),
206        }
207    }
208}
209
210#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
211pub struct HttpRecordsOutput {
212    schema: OutputSchema,
213    rows: Vec<Vec<Value>>,
214    // total_rows is equal to rows.len() in most cases,
215    // the Dashboard query result may be truncated, so we need to return the total_rows.
216    #[serde(default)]
217    total_rows: usize,
218
219    // plan level execution metrics
220    #[serde(skip_serializing_if = "HashMap::is_empty")]
221    #[serde(default)]
222    metrics: HashMap<String, Value>,
223}
224
225impl HttpRecordsOutput {
226    pub fn num_rows(&self) -> usize {
227        self.rows.len()
228    }
229
230    pub fn num_cols(&self) -> usize {
231        self.schema.column_schemas.len()
232    }
233
234    pub fn schema(&self) -> &OutputSchema {
235        &self.schema
236    }
237
238    pub fn rows(&self) -> &Vec<Vec<Value>> {
239        &self.rows
240    }
241}
242
243impl HttpRecordsOutput {
244    pub fn try_new(
245        schema: SchemaRef,
246        recordbatches: Vec<RecordBatch>,
247    ) -> std::result::Result<HttpRecordsOutput, Error> {
248        if recordbatches.is_empty() {
249            Ok(HttpRecordsOutput {
250                schema: OutputSchema::from(schema),
251                rows: vec![],
252                total_rows: 0,
253                metrics: Default::default(),
254            })
255        } else {
256            let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
257            let mut rows = Vec::with_capacity(num_rows);
258            let schemas = schema.column_schemas();
259            let num_cols = schema.column_schemas().len();
260            rows.resize_with(num_rows, || Vec::with_capacity(num_cols));
261
262            let mut finished_row_cursor = 0;
263            for recordbatch in recordbatches {
264                for (col_idx, col) in recordbatch.columns().iter().enumerate() {
265                    // safety here: schemas length is equal to the number of columns in the recordbatch
266                    let schema = &schemas[col_idx];
267                    for row_idx in 0..recordbatch.num_rows() {
268                        let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema)
269                            .context(ToJsonSnafu)?;
270                        rows[row_idx + finished_row_cursor].push(value);
271                    }
272                }
273                finished_row_cursor += recordbatch.num_rows();
274            }
275
276            Ok(HttpRecordsOutput {
277                schema: OutputSchema::from(schema),
278                total_rows: rows.len(),
279                rows,
280                metrics: Default::default(),
281            })
282        }
283    }
284}
285
286#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
287#[serde(rename_all = "lowercase")]
288pub enum GreptimeQueryOutput {
289    AffectedRows(usize),
290    Records(HttpRecordsOutput),
291}
292
293/// It allows the results of SQL queries to be presented in different formats.
294#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
295pub enum ResponseFormat {
296    Arrow,
297    Csv,
298    Table,
299    #[default]
300    GreptimedbV1,
301    InfluxdbV1,
302    Json,
303}
304
305impl ResponseFormat {
306    pub fn parse(s: &str) -> Option<Self> {
307        match s {
308            "arrow" => Some(ResponseFormat::Arrow),
309            "csv" => Some(ResponseFormat::Csv),
310            "table" => Some(ResponseFormat::Table),
311            "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
312            "influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
313            "json" => Some(ResponseFormat::Json),
314            _ => None,
315        }
316    }
317
318    pub fn as_str(&self) -> &'static str {
319        match self {
320            ResponseFormat::Arrow => "arrow",
321            ResponseFormat::Csv => "csv",
322            ResponseFormat::Table => "table",
323            ResponseFormat::GreptimedbV1 => "greptimedb_v1",
324            ResponseFormat::InfluxdbV1 => "influxdb_v1",
325            ResponseFormat::Json => "json",
326        }
327    }
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum Epoch {
332    Nanosecond,
333    Microsecond,
334    Millisecond,
335    Second,
336}
337
338impl Epoch {
339    pub fn parse(s: &str) -> Option<Epoch> {
340        // Both u and µ indicate microseconds.
341        // epoch = [ns,u,µ,ms,s],
342        // For details, see the Influxdb documents.
343        // https://docs.influxdata.com/influxdb/v1/tools/api/#query-string-parameters-1
344        match s {
345            "ns" => Some(Epoch::Nanosecond),
346            "u" | "µ" => Some(Epoch::Microsecond),
347            "ms" => Some(Epoch::Millisecond),
348            "s" => Some(Epoch::Second),
349            _ => None, // just returns None for other cases
350        }
351    }
352
353    pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
354        match self {
355            Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
356            Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
357            Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
358            Epoch::Second => ts.convert_to(TimeUnit::Second),
359        }
360    }
361}
362
363impl Display for Epoch {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        match self {
366            Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
367            Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
368            Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
369            Epoch::Second => write!(f, "Epoch::Second"),
370        }
371    }
372}
373
374#[derive(Serialize, Deserialize, Debug)]
375pub enum HttpResponse {
376    Arrow(ArrowResponse),
377    Csv(CsvResponse),
378    Table(TableResponse),
379    Error(ErrorResponse),
380    GreptimedbV1(GreptimedbV1Response),
381    InfluxdbV1(InfluxdbV1Response),
382    Json(JsonResponse),
383}
384
385impl HttpResponse {
386    pub fn with_execution_time(self, execution_time: u64) -> Self {
387        match self {
388            HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
389            HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
390            HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
391            HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
392            HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
393            HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
394            HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
395        }
396    }
397
398    pub fn with_limit(self, limit: usize) -> Self {
399        match self {
400            HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
401            HttpResponse::Table(resp) => resp.with_limit(limit).into(),
402            HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
403            HttpResponse::Json(resp) => resp.with_limit(limit).into(),
404            _ => self,
405        }
406    }
407}
408
409pub fn process_with_limit(
410    mut outputs: Vec<GreptimeQueryOutput>,
411    limit: usize,
412) -> Vec<GreptimeQueryOutput> {
413    outputs
414        .drain(..)
415        .map(|data| match data {
416            GreptimeQueryOutput::Records(mut records) => {
417                if records.rows.len() > limit {
418                    records.rows.truncate(limit);
419                    records.total_rows = limit;
420                }
421                GreptimeQueryOutput::Records(records)
422            }
423            _ => data,
424        })
425        .collect()
426}
427
428impl IntoResponse for HttpResponse {
429    fn into_response(self) -> Response {
430        match self {
431            HttpResponse::Arrow(resp) => resp.into_response(),
432            HttpResponse::Csv(resp) => resp.into_response(),
433            HttpResponse::Table(resp) => resp.into_response(),
434            HttpResponse::GreptimedbV1(resp) => resp.into_response(),
435            HttpResponse::InfluxdbV1(resp) => resp.into_response(),
436            HttpResponse::Json(resp) => resp.into_response(),
437            HttpResponse::Error(resp) => resp.into_response(),
438        }
439    }
440}
441
442impl From<ArrowResponse> for HttpResponse {
443    fn from(value: ArrowResponse) -> Self {
444        HttpResponse::Arrow(value)
445    }
446}
447
448impl From<CsvResponse> for HttpResponse {
449    fn from(value: CsvResponse) -> Self {
450        HttpResponse::Csv(value)
451    }
452}
453
454impl From<TableResponse> for HttpResponse {
455    fn from(value: TableResponse) -> Self {
456        HttpResponse::Table(value)
457    }
458}
459
460impl From<ErrorResponse> for HttpResponse {
461    fn from(value: ErrorResponse) -> Self {
462        HttpResponse::Error(value)
463    }
464}
465
466impl From<GreptimedbV1Response> for HttpResponse {
467    fn from(value: GreptimedbV1Response) -> Self {
468        HttpResponse::GreptimedbV1(value)
469    }
470}
471
472impl From<InfluxdbV1Response> for HttpResponse {
473    fn from(value: InfluxdbV1Response) -> Self {
474        HttpResponse::InfluxdbV1(value)
475    }
476}
477
478impl From<JsonResponse> for HttpResponse {
479    fn from(value: JsonResponse) -> Self {
480        HttpResponse::Json(value)
481    }
482}
483
484#[derive(Clone)]
485pub struct ApiState {
486    pub sql_handler: ServerSqlQueryHandlerRef,
487}
488
489#[derive(Clone)]
490pub struct GreptimeOptionsConfigState {
491    pub greptime_config_options: String,
492}
493
494#[derive(Default)]
495pub struct HttpServerBuilder {
496    options: HttpOptions,
497    plugins: Plugins,
498    user_provider: Option<UserProviderRef>,
499    router: Router,
500}
501
502impl HttpServerBuilder {
503    pub fn new(options: HttpOptions) -> Self {
504        Self {
505            options,
506            plugins: Plugins::default(),
507            user_provider: None,
508            router: Router::new(),
509        }
510    }
511
512    pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
513        let sql_router = HttpServer::route_sql(ApiState { sql_handler });
514
515        Self {
516            router: self
517                .router
518                .nest(&format!("/{HTTP_API_VERSION}"), sql_router),
519            ..self
520        }
521    }
522
523    pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
524        let logs_router = HttpServer::route_logs(logs_handler);
525
526        Self {
527            router: self
528                .router
529                .nest(&format!("/{HTTP_API_VERSION}"), logs_router),
530            ..self
531        }
532    }
533
534    pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
535        Self {
536            router: self.router.nest(
537                &format!("/{HTTP_API_VERSION}/opentsdb"),
538                HttpServer::route_opentsdb(handler),
539            ),
540            ..self
541        }
542    }
543
544    pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
545        Self {
546            router: self.router.nest(
547                &format!("/{HTTP_API_VERSION}/influxdb"),
548                HttpServer::route_influxdb(handler),
549            ),
550            ..self
551        }
552    }
553
554    pub fn with_prom_handler(
555        self,
556        handler: PromStoreProtocolHandlerRef,
557        prom_store_with_metric_engine: bool,
558        is_strict_mode: bool,
559    ) -> Self {
560        let state = PromStoreState {
561            prom_store_handler: handler,
562            prom_store_with_metric_engine,
563            is_strict_mode,
564        };
565
566        Self {
567            router: self.router.nest(
568                &format!("/{HTTP_API_VERSION}/prometheus"),
569                HttpServer::route_prom(state),
570            ),
571            ..self
572        }
573    }
574
575    pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
576        Self {
577            router: self.router.nest(
578                &format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
579                HttpServer::route_prometheus(handler),
580            ),
581            ..self
582        }
583    }
584
585    pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
586        Self {
587            router: self.router.nest(
588                &format!("/{HTTP_API_VERSION}/otlp"),
589                HttpServer::route_otlp(handler),
590            ),
591            ..self
592        }
593    }
594
595    pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
596        Self {
597            user_provider: Some(user_provider),
598            ..self
599        }
600    }
601
602    pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
603        Self {
604            router: self.router.merge(HttpServer::route_metrics(handler)),
605            ..self
606        }
607    }
608
609    pub fn with_log_ingest_handler(
610        self,
611        handler: PipelineHandlerRef,
612        validator: Option<LogValidatorRef>,
613        ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
614    ) -> Self {
615        let log_state = LogState {
616            log_handler: handler,
617            log_validator: validator,
618            ingest_interceptor,
619        };
620
621        let router = self.router.nest(
622            &format!("/{HTTP_API_VERSION}"),
623            HttpServer::route_pipelines(log_state.clone()),
624        );
625        // deprecated since v0.11.0. Use `/logs` and `/pipelines` instead.
626        let router = router.nest(
627            &format!("/{HTTP_API_VERSION}/events"),
628            #[allow(deprecated)]
629            HttpServer::route_log_deprecated(log_state.clone()),
630        );
631
632        let router = router.nest(
633            &format!("/{HTTP_API_VERSION}/loki"),
634            HttpServer::route_loki(log_state.clone()),
635        );
636
637        let router = router.nest(
638            &format!("/{HTTP_API_VERSION}/elasticsearch"),
639            HttpServer::route_elasticsearch(log_state.clone()),
640        );
641
642        let router = router.nest(
643            &format!("/{HTTP_API_VERSION}/elasticsearch/"),
644            Router::new()
645                .route("/", routing::get(elasticsearch::handle_get_version))
646                .with_state(log_state),
647        );
648
649        Self { router, ..self }
650    }
651
652    pub fn with_plugins(self, plugins: Plugins) -> Self {
653        Self { plugins, ..self }
654    }
655
656    pub fn with_greptime_config_options(self, opts: String) -> Self {
657        let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
658            greptime_config_options: opts,
659        });
660
661        Self {
662            router: self.router.merge(config_router),
663            ..self
664        }
665    }
666
667    pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
668        Self {
669            router: self.router.nest(
670                &format!("/{HTTP_API_VERSION}/jaeger"),
671                HttpServer::route_jaeger(handler),
672            ),
673            ..self
674        }
675    }
676
677    pub fn with_extra_router(self, router: Router) -> Self {
678        Self {
679            router: self.router.merge(router),
680            ..self
681        }
682    }
683
684    pub fn build(self) -> HttpServer {
685        HttpServer {
686            options: self.options,
687            user_provider: self.user_provider,
688            shutdown_tx: Mutex::new(None),
689            plugins: self.plugins,
690            router: StdMutex::new(self.router),
691            bind_addr: None,
692        }
693    }
694}
695
696impl HttpServer {
697    /// Gets the router and adds necessary root routes (health, status, dashboard).
698    pub fn make_app(&self) -> Router {
699        let mut router = {
700            let router = self.router.lock().unwrap();
701            router.clone()
702        };
703
704        router = router
705            .route(
706                "/health",
707                routing::get(handler::health).post(handler::health),
708            )
709            .route(
710                "/ready",
711                routing::get(handler::health).post(handler::health),
712            );
713
714        router = router.route("/status", routing::get(handler::status));
715
716        #[cfg(feature = "dashboard")]
717        {
718            if !self.options.disable_dashboard {
719                info!("Enable dashboard service at '/dashboard'");
720                // redirect /dashboard to /dashboard/
721                router = router.route(
722                    "/dashboard",
723                    routing::get(|uri: axum::http::uri::Uri| async move {
724                        let path = uri.path();
725                        let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
726
727                        let new_uri = format!("{}/{}", path, query);
728                        axum::response::Redirect::permanent(&new_uri)
729                    }),
730                );
731
732                // "/dashboard" and "/dashboard/" are two different paths in Axum.
733                // We cannot nest "/dashboard/", because we already mapping "/dashboard/{*x}" while nesting "/dashboard".
734                // So we explicitly route "/dashboard/" here.
735                router = router
736                    .route(
737                        "/dashboard/",
738                        routing::get(dashboard::static_handler).post(dashboard::static_handler),
739                    )
740                    .route(
741                        "/dashboard/{*x}",
742                        routing::get(dashboard::static_handler).post(dashboard::static_handler),
743                    );
744            }
745        }
746
747        // Add a layer to collect HTTP metrics for axum.
748        router = router.route_layer(middleware::from_fn(http_metrics_layer));
749
750        router
751    }
752
753    /// Attaches middlewares and debug routes to the router.
754    /// Callers should call this method after [HttpServer::make_app()].
755    pub fn build(&self, router: Router) -> Result<Router> {
756        let timeout_layer = if self.options.timeout != Duration::default() {
757            Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
758        } else {
759            info!("HTTP server timeout is disabled");
760            None
761        };
762        let body_limit_layer = if self.options.body_limit != ReadableSize(0) {
763            Some(
764                ServiceBuilder::new()
765                    .layer(DefaultBodyLimit::max(self.options.body_limit.0 as usize)),
766            )
767        } else {
768            info!("HTTP server body limit is disabled");
769            None
770        };
771        let cors_layer = if self.options.enable_cors {
772            Some(
773                CorsLayer::new()
774                    .allow_methods([
775                        Method::GET,
776                        Method::POST,
777                        Method::PUT,
778                        Method::DELETE,
779                        Method::HEAD,
780                    ])
781                    .allow_origin(if self.options.cors_allowed_origins.is_empty() {
782                        AllowOrigin::from(Any)
783                    } else {
784                        AllowOrigin::from(
785                            self.options
786                                .cors_allowed_origins
787                                .iter()
788                                .map(|s| {
789                                    HeaderValue::from_str(s.as_str())
790                                        .context(InvalidHeaderValueSnafu)
791                                })
792                                .collect::<Result<Vec<HeaderValue>>>()?,
793                        )
794                    })
795                    .allow_headers(Any),
796            )
797        } else {
798            info!("HTTP server cross-origin is disabled");
799            None
800        };
801
802        Ok(router
803            // middlewares
804            .layer(
805                ServiceBuilder::new()
806                    // disable on failure tracing. because printing out isn't very helpful,
807                    // and we have impl IntoResponse for Error. It will print out more detailed error messages
808                    .layer(TraceLayer::new_for_http().on_failure(()))
809                    .option_layer(cors_layer)
810                    .option_layer(timeout_layer)
811                    .option_layer(body_limit_layer)
812                    // auth layer
813                    .layer(middleware::from_fn_with_state(
814                        AuthState::new(self.user_provider.clone()),
815                        authorize::check_http_auth,
816                    ))
817                    .layer(middleware::from_fn(hints::extract_hints))
818                    .layer(middleware::from_fn(
819                        read_preference::extract_read_preference,
820                    )),
821            )
822            // Handlers for debug, we don't expect a timeout.
823            .nest(
824                "/debug",
825                Router::new()
826                    // handler for changing log level dynamically
827                    .route("/log_level", routing::post(dyn_log::dyn_log_handler))
828                    .nest(
829                        "/prof",
830                        Router::new()
831                            .route("/cpu", routing::post(pprof::pprof_handler))
832                            .route("/mem", routing::post(mem_prof::mem_prof_handler)),
833                    ),
834            ))
835    }
836
837    fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
838        Router::new()
839            .route("/metrics", routing::get(handler::metrics))
840            .with_state(metrics_handler)
841    }
842
843    fn route_loki<S>(log_state: LogState) -> Router<S> {
844        Router::new()
845            .route("/api/v1/push", routing::post(loki::loki_ingest))
846            .layer(
847                ServiceBuilder::new()
848                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
849            )
850            .with_state(log_state)
851    }
852
853    fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
854        Router::new()
855            // Return fake responsefor HEAD '/' request.
856            .route(
857                "/",
858                routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
859            )
860            // Return fake response for Elasticsearch version request.
861            .route("/", routing::get(elasticsearch::handle_get_version))
862            // Return fake response for Elasticsearch license request.
863            .route("/_license", routing::get(elasticsearch::handle_get_license))
864            .route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
865            .route(
866                "/{index}/_bulk",
867                routing::post(elasticsearch::handle_bulk_api_with_index),
868            )
869            // Return fake response for Elasticsearch ilm request.
870            .route(
871                "/_ilm/policy/{*path}",
872                routing::any((
873                    HttpStatusCode::OK,
874                    elasticsearch::elasticsearch_headers(),
875                    axum::Json(serde_json::json!({})),
876                )),
877            )
878            // Return fake response for Elasticsearch index template request.
879            .route(
880                "/_index_template/{*path}",
881                routing::any((
882                    HttpStatusCode::OK,
883                    elasticsearch::elasticsearch_headers(),
884                    axum::Json(serde_json::json!({})),
885                )),
886            )
887            // Return fake response for Elasticsearch ingest pipeline request.
888            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/put-pipeline-api.html.
889            .route(
890                "/_ingest/{*path}",
891                routing::any((
892                    HttpStatusCode::OK,
893                    elasticsearch::elasticsearch_headers(),
894                    axum::Json(serde_json::json!({})),
895                )),
896            )
897            // Return fake response for Elasticsearch nodes discovery request.
898            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/cluster.html.
899            .route(
900                "/_nodes/{*path}",
901                routing::any((
902                    HttpStatusCode::OK,
903                    elasticsearch::elasticsearch_headers(),
904                    axum::Json(serde_json::json!({})),
905                )),
906            )
907            // Return fake response for Logstash APIs requests.
908            // See: https://www.elastic.co/guide/en/elasticsearch/reference/8.8/logstash-apis.html
909            .route(
910                "/logstash/{*path}",
911                routing::any((
912                    HttpStatusCode::OK,
913                    elasticsearch::elasticsearch_headers(),
914                    axum::Json(serde_json::json!({})),
915                )),
916            )
917            .route(
918                "/_logstash/{*path}",
919                routing::any((
920                    HttpStatusCode::OK,
921                    elasticsearch::elasticsearch_headers(),
922                    axum::Json(serde_json::json!({})),
923                )),
924            )
925            .layer(ServiceBuilder::new().layer(RequestDecompressionLayer::new()))
926            .with_state(log_state)
927    }
928
929    #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
930    fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
931        Router::new()
932            .route("/logs", routing::post(event::log_ingester))
933            .route(
934                "/pipelines/{pipeline_name}",
935                routing::get(event::query_pipeline),
936            )
937            .route(
938                "/pipelines/{pipeline_name}",
939                routing::post(event::add_pipeline),
940            )
941            .route(
942                "/pipelines/{pipeline_name}",
943                routing::delete(event::delete_pipeline),
944            )
945            .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
946            .layer(
947                ServiceBuilder::new()
948                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
949            )
950            .with_state(log_state)
951    }
952
953    fn route_pipelines<S>(log_state: LogState) -> Router<S> {
954        Router::new()
955            .route("/ingest", routing::post(event::log_ingester))
956            .route(
957                "/pipelines/{pipeline_name}",
958                routing::get(event::query_pipeline),
959            )
960            .route(
961                "/pipelines/{pipeline_name}",
962                routing::post(event::add_pipeline),
963            )
964            .route(
965                "/pipelines/{pipeline_name}",
966                routing::delete(event::delete_pipeline),
967            )
968            .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
969            .layer(
970                ServiceBuilder::new()
971                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
972            )
973            .with_state(log_state)
974    }
975
976    fn route_sql<S>(api_state: ApiState) -> Router<S> {
977        Router::new()
978            .route("/sql", routing::get(handler::sql).post(handler::sql))
979            .route(
980                "/sql/parse",
981                routing::get(handler::sql_parse).post(handler::sql_parse),
982            )
983            .route(
984                "/promql",
985                routing::get(handler::promql).post(handler::promql),
986            )
987            .with_state(api_state)
988    }
989
990    fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
991        Router::new()
992            .route("/logs", routing::get(logs::logs).post(logs::logs))
993            .with_state(log_handler)
994    }
995
996    /// Route Prometheus [HTTP API].
997    ///
998    /// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
999    fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
1000        Router::new()
1001            .route(
1002                "/format_query",
1003                routing::post(format_query).get(format_query),
1004            )
1005            .route("/status/buildinfo", routing::get(build_info_query))
1006            .route("/query", routing::post(instant_query).get(instant_query))
1007            .route("/query_range", routing::post(range_query).get(range_query))
1008            .route("/labels", routing::post(labels_query).get(labels_query))
1009            .route("/series", routing::post(series_query).get(series_query))
1010            .route("/parse_query", routing::post(parse_query).get(parse_query))
1011            .route(
1012                "/label/{label_name}/values",
1013                routing::get(label_values_query),
1014            )
1015            .layer(ServiceBuilder::new().layer(CompressionLayer::new()))
1016            .with_state(prometheus_handler)
1017    }
1018
1019    /// Route Prometheus remote [read] and [write] API. In other places the related modules are
1020    /// called `prom_store`.
1021    ///
1022    /// [read]: https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/
1023    /// [write]: https://prometheus.io/docs/concepts/remote_write_spec/
1024    fn route_prom<S>(state: PromStoreState) -> Router<S> {
1025        Router::new()
1026            .route("/read", routing::post(prom_store::remote_read))
1027            .route("/write", routing::post(prom_store::remote_write))
1028            .with_state(state)
1029    }
1030
1031    fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
1032        Router::new()
1033            .route("/write", routing::post(influxdb_write_v1))
1034            .route("/api/v2/write", routing::post(influxdb_write_v2))
1035            .layer(
1036                ServiceBuilder::new()
1037                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1038            )
1039            .route("/ping", routing::get(influxdb_ping))
1040            .route("/health", routing::get(influxdb_health))
1041            .with_state(influxdb_handler)
1042    }
1043
1044    fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
1045        Router::new()
1046            .route("/api/put", routing::post(opentsdb::put))
1047            .with_state(opentsdb_handler)
1048    }
1049
1050    fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
1051        Router::new()
1052            .route("/v1/metrics", routing::post(otlp::metrics))
1053            .route("/v1/traces", routing::post(otlp::traces))
1054            .route("/v1/logs", routing::post(otlp::logs))
1055            .layer(
1056                ServiceBuilder::new()
1057                    .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1058            )
1059            .with_state(otlp_handler)
1060    }
1061
1062    fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
1063        Router::new()
1064            .route("/config", routing::get(handler::config))
1065            .with_state(state)
1066    }
1067
1068    fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
1069        Router::new()
1070            .route("/api/services", routing::get(jaeger::handle_get_services))
1071            .route(
1072                "/api/services/{service_name}/operations",
1073                routing::get(jaeger::handle_get_operations_by_service),
1074            )
1075            .route(
1076                "/api/operations",
1077                routing::get(jaeger::handle_get_operations),
1078            )
1079            .route("/api/traces", routing::get(jaeger::handle_find_traces))
1080            .route(
1081                "/api/traces/{trace_id}",
1082                routing::get(jaeger::handle_get_trace),
1083            )
1084            .with_state(handler)
1085    }
1086}
1087
1088pub const HTTP_SERVER: &str = "HTTP_SERVER";
1089
1090#[async_trait]
1091impl Server for HttpServer {
1092    async fn shutdown(&self) -> Result<()> {
1093        let mut shutdown_tx = self.shutdown_tx.lock().await;
1094        if let Some(tx) = shutdown_tx.take() {
1095            if tx.send(()).is_err() {
1096                info!("Receiver dropped, the HTTP server has already existed");
1097            }
1098        }
1099        info!("Shutdown HTTP server");
1100
1101        Ok(())
1102    }
1103
1104    async fn start(&mut self, listening: SocketAddr) -> Result<()> {
1105        let (tx, rx) = oneshot::channel();
1106        let serve = {
1107            let mut shutdown_tx = self.shutdown_tx.lock().await;
1108            ensure!(
1109                shutdown_tx.is_none(),
1110                AlreadyStartedSnafu { server: "HTTP" }
1111            );
1112
1113            let mut app = self.make_app();
1114            if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
1115                app = configurator.config_http(app);
1116            }
1117            let app = self.build(app)?;
1118            let listener = tokio::net::TcpListener::bind(listening)
1119                .await
1120                .context(AddressBindSnafu { addr: listening })?
1121                .tap_io(|tcp_stream| {
1122                    if let Err(e) = tcp_stream.set_nodelay(true) {
1123                        error!(e; "Failed to set TCP_NODELAY on incoming connection");
1124                    }
1125                });
1126            let serve = axum::serve(listener, app.into_make_service());
1127
1128            // FIXME(yingwen): Support keepalive.
1129            // See:
1130            // - https://github.com/tokio-rs/axum/discussions/2939
1131            // - https://stackoverflow.com/questions/73069718/how-do-i-keep-alive-tokiotcpstream-in-rust
1132            // let server = axum::Server::try_bind(&listening)
1133            //     .with_context(|_| AddressBindSnafu { addr: listening })?
1134            //     .tcp_nodelay(true)
1135            //     // Enable TCP keepalive to close the dangling established connections.
1136            //     // It's configured to let the keepalive probes first send after the connection sits
1137            //     // idle for 59 minutes, and then send every 10 seconds for 6 times.
1138            //     // So the connection will be closed after roughly 1 hour.
1139            //     .tcp_keepalive(Some(Duration::from_secs(59 * 60)))
1140            //     .tcp_keepalive_interval(Some(Duration::from_secs(10)))
1141            //     .tcp_keepalive_retries(Some(6))
1142            //     .serve(app.into_make_service());
1143
1144            *shutdown_tx = Some(tx);
1145
1146            serve
1147        };
1148        let listening = serve.local_addr().context(InternalIoSnafu)?;
1149        info!("HTTP server is bound to {}", listening);
1150
1151        common_runtime::spawn_global(async move {
1152            if let Err(e) = serve
1153                .with_graceful_shutdown(rx.map(drop))
1154                .await
1155                .context(InternalIoSnafu)
1156            {
1157                error!(e; "Failed to shutdown http server");
1158            }
1159        });
1160
1161        self.bind_addr = Some(listening);
1162        Ok(())
1163    }
1164
1165    fn name(&self) -> &str {
1166        HTTP_SERVER
1167    }
1168
1169    fn bind_addr(&self) -> Option<SocketAddr> {
1170        self.bind_addr
1171    }
1172}
1173
1174#[cfg(test)]
1175mod test {
1176    use std::future::pending;
1177    use std::io::Cursor;
1178    use std::sync::Arc;
1179
1180    use arrow_ipc::reader::FileReader;
1181    use arrow_schema::DataType;
1182    use axum::handler::Handler;
1183    use axum::http::StatusCode;
1184    use axum::routing::get;
1185    use common_query::Output;
1186    use common_recordbatch::RecordBatches;
1187    use datafusion_expr::LogicalPlan;
1188    use datatypes::prelude::*;
1189    use datatypes::schema::{ColumnSchema, Schema};
1190    use datatypes::vectors::{StringVector, UInt32Vector};
1191    use header::constants::GREPTIME_DB_HEADER_TIMEOUT;
1192    use query::parser::PromQuery;
1193    use query::query_engine::DescribeResult;
1194    use session::context::QueryContextRef;
1195    use tokio::sync::mpsc;
1196    use tokio::time::Instant;
1197
1198    use super::*;
1199    use crate::error::Error;
1200    use crate::http::test_helpers::TestClient;
1201    use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
1202
1203    struct DummyInstance {
1204        _tx: mpsc::Sender<(String, Vec<u8>)>,
1205    }
1206
1207    #[async_trait]
1208    impl SqlQueryHandler for DummyInstance {
1209        type Error = Error;
1210
1211        async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
1212            unimplemented!()
1213        }
1214
1215        async fn do_promql_query(
1216            &self,
1217            _: &PromQuery,
1218            _: QueryContextRef,
1219        ) -> Vec<std::result::Result<Output, Self::Error>> {
1220            unimplemented!()
1221        }
1222
1223        async fn do_exec_plan(
1224            &self,
1225            _plan: LogicalPlan,
1226            _query_ctx: QueryContextRef,
1227        ) -> std::result::Result<Output, Self::Error> {
1228            unimplemented!()
1229        }
1230
1231        async fn do_describe(
1232            &self,
1233            _stmt: sql::statements::statement::Statement,
1234            _query_ctx: QueryContextRef,
1235        ) -> Result<Option<DescribeResult>> {
1236            unimplemented!()
1237        }
1238
1239        async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
1240            Ok(true)
1241        }
1242    }
1243
1244    fn timeout() -> DynamicTimeoutLayer {
1245        DynamicTimeoutLayer::new(Duration::from_millis(10))
1246    }
1247
1248    async fn forever() {
1249        pending().await
1250    }
1251
1252    fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
1253        make_test_app_custom(tx, HttpOptions::default())
1254    }
1255
1256    fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
1257        let instance = Arc::new(DummyInstance { _tx: tx });
1258        let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
1259        let server = HttpServerBuilder::new(options)
1260            .with_sql_handler(sql_instance)
1261            .build();
1262        server.build(server.make_app()).unwrap().route(
1263            "/test/timeout",
1264            get(forever.layer(ServiceBuilder::new().layer(timeout()))),
1265        )
1266    }
1267
1268    #[tokio::test]
1269    pub async fn test_cors() {
1270        // cors is on by default
1271        let (tx, _rx) = mpsc::channel(100);
1272        let app = make_test_app(tx);
1273        let client = TestClient::new(app).await;
1274
1275        let res = client.get("/health").send().await;
1276
1277        assert_eq!(res.status(), StatusCode::OK);
1278        assert_eq!(
1279            res.headers()
1280                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1281                .expect("expect cors header origin"),
1282            "*"
1283        );
1284
1285        let res = client
1286            .options("/health")
1287            .header("Access-Control-Request-Headers", "x-greptime-auth")
1288            .header("Access-Control-Request-Method", "DELETE")
1289            .header("Origin", "https://example.com")
1290            .send()
1291            .await;
1292        assert_eq!(res.status(), StatusCode::OK);
1293        assert_eq!(
1294            res.headers()
1295                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1296                .expect("expect cors header origin"),
1297            "*"
1298        );
1299        assert_eq!(
1300            res.headers()
1301                .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
1302                .expect("expect cors header headers"),
1303            "*"
1304        );
1305        assert_eq!(
1306            res.headers()
1307                .get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
1308                .expect("expect cors header methods"),
1309            "GET,POST,PUT,DELETE,HEAD"
1310        );
1311    }
1312
1313    #[tokio::test]
1314    pub async fn test_cors_custom_origins() {
1315        // cors is on by default
1316        let (tx, _rx) = mpsc::channel(100);
1317        let origin = "https://example.com";
1318
1319        let options = HttpOptions {
1320            cors_allowed_origins: vec![origin.to_string()],
1321            ..Default::default()
1322        };
1323
1324        let app = make_test_app_custom(tx, options);
1325        let client = TestClient::new(app).await;
1326
1327        let res = client.get("/health").header("Origin", origin).send().await;
1328
1329        assert_eq!(res.status(), StatusCode::OK);
1330        assert_eq!(
1331            res.headers()
1332                .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1333                .expect("expect cors header origin"),
1334            origin
1335        );
1336
1337        let res = client
1338            .get("/health")
1339            .header("Origin", "https://notallowed.com")
1340            .send()
1341            .await;
1342
1343        assert_eq!(res.status(), StatusCode::OK);
1344        assert!(!res
1345            .headers()
1346            .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1347    }
1348
1349    #[tokio::test]
1350    pub async fn test_cors_disabled() {
1351        // cors is on by default
1352        let (tx, _rx) = mpsc::channel(100);
1353
1354        let options = HttpOptions {
1355            enable_cors: false,
1356            ..Default::default()
1357        };
1358
1359        let app = make_test_app_custom(tx, options);
1360        let client = TestClient::new(app).await;
1361
1362        let res = client.get("/health").send().await;
1363
1364        assert_eq!(res.status(), StatusCode::OK);
1365        assert!(!res
1366            .headers()
1367            .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1368    }
1369
1370    #[test]
1371    fn test_http_options_default() {
1372        let default = HttpOptions::default();
1373        assert_eq!("127.0.0.1:4000".to_string(), default.addr);
1374        assert_eq!(Duration::from_secs(0), default.timeout)
1375    }
1376
1377    #[tokio::test]
1378    async fn test_http_server_request_timeout() {
1379        common_telemetry::init_default_ut_logging();
1380
1381        let (tx, _rx) = mpsc::channel(100);
1382        let app = make_test_app(tx);
1383        let client = TestClient::new(app).await;
1384        let res = client.get("/test/timeout").send().await;
1385        assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1386
1387        let now = Instant::now();
1388        let res = client
1389            .get("/test/timeout")
1390            .header(GREPTIME_DB_HEADER_TIMEOUT, "20ms")
1391            .send()
1392            .await;
1393        assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1394        let elapsed = now.elapsed();
1395        assert!(elapsed > Duration::from_millis(15));
1396
1397        tokio::time::timeout(
1398            Duration::from_millis(15),
1399            client
1400                .get("/test/timeout")
1401                .header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
1402                .send(),
1403        )
1404        .await
1405        .unwrap_err();
1406
1407        tokio::time::timeout(
1408            Duration::from_millis(15),
1409            client
1410                .get("/test/timeout")
1411                .header(
1412                    GREPTIME_DB_HEADER_TIMEOUT,
1413                    humantime::format_duration(Duration::default()).to_string(),
1414                )
1415                .send(),
1416        )
1417        .await
1418        .unwrap_err();
1419    }
1420
1421    #[tokio::test]
1422    async fn test_schema_for_empty_response() {
1423        let column_schemas = vec![
1424            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1425            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1426        ];
1427        let schema = Arc::new(Schema::new(column_schemas));
1428
1429        let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
1430        let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1431
1432        let json_resp = GreptimedbV1Response::from_output(outputs).await;
1433        if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
1434            let json_output = &json_resp.output[0];
1435            if let GreptimeQueryOutput::Records(r) = json_output {
1436                assert_eq!(r.num_rows(), 0);
1437                assert_eq!(r.num_cols(), 2);
1438                assert_eq!(r.schema.column_schemas[0].name, "numbers");
1439                assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1440            } else {
1441                panic!("invalid output type");
1442            }
1443        } else {
1444            panic!("invalid format")
1445        }
1446    }
1447
1448    #[tokio::test]
1449    async fn test_recordbatches_conversion() {
1450        let column_schemas = vec![
1451            ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1452            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1453        ];
1454        let schema = Arc::new(Schema::new(column_schemas));
1455        let columns: Vec<VectorRef> = vec![
1456            Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
1457            Arc::new(StringVector::from(vec![
1458                None,
1459                Some("hello"),
1460                Some("greptime"),
1461                None,
1462            ])),
1463        ];
1464        let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
1465
1466        for format in [
1467            ResponseFormat::GreptimedbV1,
1468            ResponseFormat::InfluxdbV1,
1469            ResponseFormat::Csv,
1470            ResponseFormat::Table,
1471            ResponseFormat::Arrow,
1472            ResponseFormat::Json,
1473        ] {
1474            let recordbatches =
1475                RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
1476            let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1477            let json_resp = match format {
1478                ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
1479                ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
1480                ResponseFormat::Table => TableResponse::from_output(outputs).await,
1481                ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
1482                ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
1483                ResponseFormat::Json => JsonResponse::from_output(outputs).await,
1484            };
1485
1486            match json_resp {
1487                HttpResponse::GreptimedbV1(resp) => {
1488                    let json_output = &resp.output[0];
1489                    if let GreptimeQueryOutput::Records(r) = json_output {
1490                        assert_eq!(r.num_rows(), 4);
1491                        assert_eq!(r.num_cols(), 2);
1492                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1493                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1494                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1495                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1496                    } else {
1497                        panic!("invalid output type");
1498                    }
1499                }
1500                HttpResponse::InfluxdbV1(resp) => {
1501                    let json_output = &resp.results()[0];
1502                    assert_eq!(json_output.num_rows(), 4);
1503                    assert_eq!(json_output.num_cols(), 2);
1504                    assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
1505                    assert_eq!(
1506                        json_output.series[0].values[0][0],
1507                        serde_json::Value::from(1)
1508                    );
1509                    assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
1510                }
1511                HttpResponse::Csv(resp) => {
1512                    let output = &resp.output()[0];
1513                    if let GreptimeQueryOutput::Records(r) = output {
1514                        assert_eq!(r.num_rows(), 4);
1515                        assert_eq!(r.num_cols(), 2);
1516                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1517                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1518                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1519                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1520                    } else {
1521                        panic!("invalid output type");
1522                    }
1523                }
1524
1525                HttpResponse::Table(resp) => {
1526                    let output = &resp.output()[0];
1527                    if let GreptimeQueryOutput::Records(r) = output {
1528                        assert_eq!(r.num_rows(), 4);
1529                        assert_eq!(r.num_cols(), 2);
1530                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1531                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1532                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1533                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1534                    } else {
1535                        panic!("invalid output type");
1536                    }
1537                }
1538
1539                HttpResponse::Arrow(resp) => {
1540                    let output = resp.data;
1541                    let mut reader =
1542                        FileReader::try_new(Cursor::new(output), None).expect("Arrow reader error");
1543                    let schema = reader.schema();
1544                    assert_eq!(schema.fields[0].name(), "numbers");
1545                    assert_eq!(schema.fields[0].data_type(), &DataType::UInt32);
1546                    assert_eq!(schema.fields[1].name(), "strings");
1547                    assert_eq!(schema.fields[1].data_type(), &DataType::Utf8);
1548
1549                    let rb = reader.next().unwrap().expect("read record batch failed");
1550                    assert_eq!(rb.num_columns(), 2);
1551                    assert_eq!(rb.num_rows(), 4);
1552                }
1553
1554                HttpResponse::Json(resp) => {
1555                    let output = &resp.output()[0];
1556                    if let GreptimeQueryOutput::Records(r) = output {
1557                        assert_eq!(r.num_rows(), 4);
1558                        assert_eq!(r.num_cols(), 2);
1559                        assert_eq!(r.schema.column_schemas[0].name, "numbers");
1560                        assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1561                        assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1562                        assert_eq!(r.rows[0][1], serde_json::Value::Null);
1563                    } else {
1564                        panic!("invalid output type");
1565                    }
1566                }
1567
1568                HttpResponse::Error(err) => unreachable!("{err:?}"),
1569            }
1570        }
1571    }
1572}