1use std::collections::HashMap;
16use std::fmt::Display;
17use std::net::SocketAddr;
18use std::sync::Mutex as StdMutex;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use auth::UserProviderRef;
23use axum::extract::DefaultBodyLimit;
24use axum::http::StatusCode as HttpStatusCode;
25use axum::response::{IntoResponse, Response};
26use axum::serve::ListenerExt;
27use axum::{middleware, routing, Router};
28use common_base::readable_size::ReadableSize;
29use common_base::Plugins;
30use common_recordbatch::RecordBatch;
31use common_telemetry::{error, info};
32use common_time::timestamp::TimeUnit;
33use common_time::Timestamp;
34use datatypes::data_type::DataType;
35use datatypes::schema::SchemaRef;
36use datatypes::value::transform_value_ref_to_json_value;
37use event::{LogState, LogValidatorRef};
38use futures::FutureExt;
39use http::{HeaderValue, Method};
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use snafu::{ensure, ResultExt};
43use tokio::sync::oneshot::{self, Sender};
44use tokio::sync::Mutex;
45use tower::ServiceBuilder;
46use tower_http::compression::CompressionLayer;
47use tower_http::cors::{AllowOrigin, Any, CorsLayer};
48use tower_http::decompression::RequestDecompressionLayer;
49use tower_http::trace::TraceLayer;
50
51use self::authorize::AuthState;
52use self::result::table_result::TableResponse;
53use crate::configurator::ConfiguratorRef;
54use crate::elasticsearch;
55use crate::error::{
56 AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
57 ToJsonSnafu,
58};
59use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
60use crate::http::prom_store::PromStoreState;
61use crate::http::prometheus::{
62 build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
63 range_query, series_query,
64};
65use crate::http::result::arrow_result::ArrowResponse;
66use crate::http::result::csv_result::CsvResponse;
67use crate::http::result::error_result::ErrorResponse;
68use crate::http::result::greptime_result_v1::GreptimedbV1Response;
69use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
70use crate::http::result::json_result::JsonResponse;
71use crate::interceptor::LogIngestInterceptorRef;
72use crate::metrics::http_metrics_layer;
73use crate::metrics_handler::MetricsHandler;
74use crate::prometheus_handler::PrometheusHandlerRef;
75use crate::query_handler::sql::ServerSqlQueryHandlerRef;
76use crate::query_handler::{
77 InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
78 OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
79 PromStoreProtocolHandlerRef,
80};
81use crate::server::Server;
82
83pub mod authorize;
84#[cfg(feature = "dashboard")]
85mod dashboard;
86pub mod dyn_log;
87pub mod event;
88mod extractor;
89pub mod handler;
90pub mod header;
91pub mod influxdb;
92pub mod jaeger;
93pub mod logs;
94pub mod loki;
95pub mod mem_prof;
96pub mod opentsdb;
97pub mod otlp;
98pub mod pprof;
99pub mod prom_store;
100pub mod prometheus;
101pub mod result;
102mod timeout;
103
104pub(crate) use timeout::DynamicTimeoutLayer;
105
106mod hints;
107mod read_preference;
108#[cfg(any(test, feature = "testing"))]
109pub mod test_helpers;
110
111pub const HTTP_API_VERSION: &str = "v1";
112pub const HTTP_API_PREFIX: &str = "/v1/";
113const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
115
116pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
118
119pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
121
122#[derive(Default)]
123pub struct HttpServer {
124 router: StdMutex<Router>,
125 shutdown_tx: Mutex<Option<Sender<()>>>,
126 user_provider: Option<UserProviderRef>,
127
128 plugins: Plugins,
130
131 options: HttpOptions,
133 bind_addr: Option<SocketAddr>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(default)]
138pub struct HttpOptions {
139 pub addr: String,
140
141 #[serde(with = "humantime_serde")]
142 pub timeout: Duration,
143
144 #[serde(skip)]
145 pub disable_dashboard: bool,
146
147 pub body_limit: ReadableSize,
148
149 pub prom_validation_mode: PromValidationMode,
151
152 pub cors_allowed_origins: Vec<String>,
153
154 pub enable_cors: bool,
155}
156
157#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum PromValidationMode {
160 Strict,
162 Lossy,
164 Unchecked,
166}
167
168impl Default for HttpOptions {
169 fn default() -> Self {
170 Self {
171 addr: "127.0.0.1:4000".to_string(),
172 timeout: Duration::from_secs(0),
173 disable_dashboard: false,
174 body_limit: DEFAULT_BODY_LIMIT,
175 cors_allowed_origins: Vec::new(),
176 enable_cors: true,
177 prom_validation_mode: PromValidationMode::Strict,
178 }
179 }
180}
181
182#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
183pub struct ColumnSchema {
184 name: String,
185 data_type: String,
186}
187
188impl ColumnSchema {
189 pub fn new(name: String, data_type: String) -> ColumnSchema {
190 ColumnSchema { name, data_type }
191 }
192}
193
194#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
195pub struct OutputSchema {
196 column_schemas: Vec<ColumnSchema>,
197}
198
199impl OutputSchema {
200 pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
201 OutputSchema {
202 column_schemas: columns,
203 }
204 }
205}
206
207impl From<SchemaRef> for OutputSchema {
208 fn from(schema: SchemaRef) -> OutputSchema {
209 OutputSchema {
210 column_schemas: schema
211 .column_schemas()
212 .iter()
213 .map(|cs| ColumnSchema {
214 name: cs.name.clone(),
215 data_type: cs.data_type.name(),
216 })
217 .collect(),
218 }
219 }
220}
221
222#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
223pub struct HttpRecordsOutput {
224 schema: OutputSchema,
225 rows: Vec<Vec<Value>>,
226 #[serde(default)]
229 total_rows: usize,
230
231 #[serde(skip_serializing_if = "HashMap::is_empty")]
233 #[serde(default)]
234 metrics: HashMap<String, Value>,
235}
236
237impl HttpRecordsOutput {
238 pub fn num_rows(&self) -> usize {
239 self.rows.len()
240 }
241
242 pub fn num_cols(&self) -> usize {
243 self.schema.column_schemas.len()
244 }
245
246 pub fn schema(&self) -> &OutputSchema {
247 &self.schema
248 }
249
250 pub fn rows(&self) -> &Vec<Vec<Value>> {
251 &self.rows
252 }
253}
254
255impl HttpRecordsOutput {
256 pub fn try_new(
257 schema: SchemaRef,
258 recordbatches: Vec<RecordBatch>,
259 ) -> std::result::Result<HttpRecordsOutput, Error> {
260 if recordbatches.is_empty() {
261 Ok(HttpRecordsOutput {
262 schema: OutputSchema::from(schema),
263 rows: vec![],
264 total_rows: 0,
265 metrics: Default::default(),
266 })
267 } else {
268 let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
269 let mut rows = Vec::with_capacity(num_rows);
270 let schemas = schema.column_schemas();
271 let num_cols = schema.column_schemas().len();
272 rows.resize_with(num_rows, || Vec::with_capacity(num_cols));
273
274 let mut finished_row_cursor = 0;
275 for recordbatch in recordbatches {
276 for (col_idx, col) in recordbatch.columns().iter().enumerate() {
277 let schema = &schemas[col_idx];
279 for row_idx in 0..recordbatch.num_rows() {
280 let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema)
281 .context(ToJsonSnafu)?;
282 rows[row_idx + finished_row_cursor].push(value);
283 }
284 }
285 finished_row_cursor += recordbatch.num_rows();
286 }
287
288 Ok(HttpRecordsOutput {
289 schema: OutputSchema::from(schema),
290 total_rows: rows.len(),
291 rows,
292 metrics: Default::default(),
293 })
294 }
295 }
296}
297
298#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
299#[serde(rename_all = "lowercase")]
300pub enum GreptimeQueryOutput {
301 AffectedRows(usize),
302 Records(HttpRecordsOutput),
303}
304
305#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
307pub enum ResponseFormat {
308 Arrow,
309 Csv,
310 Table,
311 #[default]
312 GreptimedbV1,
313 InfluxdbV1,
314 Json,
315}
316
317impl ResponseFormat {
318 pub fn parse(s: &str) -> Option<Self> {
319 match s {
320 "arrow" => Some(ResponseFormat::Arrow),
321 "csv" => Some(ResponseFormat::Csv),
322 "table" => Some(ResponseFormat::Table),
323 "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
324 "influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
325 "json" => Some(ResponseFormat::Json),
326 _ => None,
327 }
328 }
329
330 pub fn as_str(&self) -> &'static str {
331 match self {
332 ResponseFormat::Arrow => "arrow",
333 ResponseFormat::Csv => "csv",
334 ResponseFormat::Table => "table",
335 ResponseFormat::GreptimedbV1 => "greptimedb_v1",
336 ResponseFormat::InfluxdbV1 => "influxdb_v1",
337 ResponseFormat::Json => "json",
338 }
339 }
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum Epoch {
344 Nanosecond,
345 Microsecond,
346 Millisecond,
347 Second,
348}
349
350impl Epoch {
351 pub fn parse(s: &str) -> Option<Epoch> {
352 match s {
357 "ns" => Some(Epoch::Nanosecond),
358 "u" | "µ" => Some(Epoch::Microsecond),
359 "ms" => Some(Epoch::Millisecond),
360 "s" => Some(Epoch::Second),
361 _ => None, }
363 }
364
365 pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
366 match self {
367 Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
368 Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
369 Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
370 Epoch::Second => ts.convert_to(TimeUnit::Second),
371 }
372 }
373}
374
375impl Display for Epoch {
376 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377 match self {
378 Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
379 Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
380 Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
381 Epoch::Second => write!(f, "Epoch::Second"),
382 }
383 }
384}
385
386#[derive(Serialize, Deserialize, Debug)]
387pub enum HttpResponse {
388 Arrow(ArrowResponse),
389 Csv(CsvResponse),
390 Table(TableResponse),
391 Error(ErrorResponse),
392 GreptimedbV1(GreptimedbV1Response),
393 InfluxdbV1(InfluxdbV1Response),
394 Json(JsonResponse),
395}
396
397impl HttpResponse {
398 pub fn with_execution_time(self, execution_time: u64) -> Self {
399 match self {
400 HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
401 HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
402 HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
403 HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
404 HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
405 HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
406 HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
407 }
408 }
409
410 pub fn with_limit(self, limit: usize) -> Self {
411 match self {
412 HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
413 HttpResponse::Table(resp) => resp.with_limit(limit).into(),
414 HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
415 HttpResponse::Json(resp) => resp.with_limit(limit).into(),
416 _ => self,
417 }
418 }
419}
420
421pub fn process_with_limit(
422 mut outputs: Vec<GreptimeQueryOutput>,
423 limit: usize,
424) -> Vec<GreptimeQueryOutput> {
425 outputs
426 .drain(..)
427 .map(|data| match data {
428 GreptimeQueryOutput::Records(mut records) => {
429 if records.rows.len() > limit {
430 records.rows.truncate(limit);
431 records.total_rows = limit;
432 }
433 GreptimeQueryOutput::Records(records)
434 }
435 _ => data,
436 })
437 .collect()
438}
439
440impl IntoResponse for HttpResponse {
441 fn into_response(self) -> Response {
442 match self {
443 HttpResponse::Arrow(resp) => resp.into_response(),
444 HttpResponse::Csv(resp) => resp.into_response(),
445 HttpResponse::Table(resp) => resp.into_response(),
446 HttpResponse::GreptimedbV1(resp) => resp.into_response(),
447 HttpResponse::InfluxdbV1(resp) => resp.into_response(),
448 HttpResponse::Json(resp) => resp.into_response(),
449 HttpResponse::Error(resp) => resp.into_response(),
450 }
451 }
452}
453
454impl From<ArrowResponse> for HttpResponse {
455 fn from(value: ArrowResponse) -> Self {
456 HttpResponse::Arrow(value)
457 }
458}
459
460impl From<CsvResponse> for HttpResponse {
461 fn from(value: CsvResponse) -> Self {
462 HttpResponse::Csv(value)
463 }
464}
465
466impl From<TableResponse> for HttpResponse {
467 fn from(value: TableResponse) -> Self {
468 HttpResponse::Table(value)
469 }
470}
471
472impl From<ErrorResponse> for HttpResponse {
473 fn from(value: ErrorResponse) -> Self {
474 HttpResponse::Error(value)
475 }
476}
477
478impl From<GreptimedbV1Response> for HttpResponse {
479 fn from(value: GreptimedbV1Response) -> Self {
480 HttpResponse::GreptimedbV1(value)
481 }
482}
483
484impl From<InfluxdbV1Response> for HttpResponse {
485 fn from(value: InfluxdbV1Response) -> Self {
486 HttpResponse::InfluxdbV1(value)
487 }
488}
489
490impl From<JsonResponse> for HttpResponse {
491 fn from(value: JsonResponse) -> Self {
492 HttpResponse::Json(value)
493 }
494}
495
496#[derive(Clone)]
497pub struct ApiState {
498 pub sql_handler: ServerSqlQueryHandlerRef,
499}
500
501#[derive(Clone)]
502pub struct GreptimeOptionsConfigState {
503 pub greptime_config_options: String,
504}
505
506#[derive(Default)]
507pub struct HttpServerBuilder {
508 options: HttpOptions,
509 plugins: Plugins,
510 user_provider: Option<UserProviderRef>,
511 router: Router,
512}
513
514impl HttpServerBuilder {
515 pub fn new(options: HttpOptions) -> Self {
516 Self {
517 options,
518 plugins: Plugins::default(),
519 user_provider: None,
520 router: Router::new(),
521 }
522 }
523
524 pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
525 let sql_router = HttpServer::route_sql(ApiState { sql_handler });
526
527 Self {
528 router: self
529 .router
530 .nest(&format!("/{HTTP_API_VERSION}"), sql_router),
531 ..self
532 }
533 }
534
535 pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
536 let logs_router = HttpServer::route_logs(logs_handler);
537
538 Self {
539 router: self
540 .router
541 .nest(&format!("/{HTTP_API_VERSION}"), logs_router),
542 ..self
543 }
544 }
545
546 pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
547 Self {
548 router: self.router.nest(
549 &format!("/{HTTP_API_VERSION}/opentsdb"),
550 HttpServer::route_opentsdb(handler),
551 ),
552 ..self
553 }
554 }
555
556 pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
557 Self {
558 router: self.router.nest(
559 &format!("/{HTTP_API_VERSION}/influxdb"),
560 HttpServer::route_influxdb(handler),
561 ),
562 ..self
563 }
564 }
565
566 pub fn with_prom_handler(
567 self,
568 handler: PromStoreProtocolHandlerRef,
569 pipeline_handler: Option<PipelineHandlerRef>,
570 prom_store_with_metric_engine: bool,
571 prom_validation_mode: PromValidationMode,
572 ) -> Self {
573 let state = PromStoreState {
574 prom_store_handler: handler,
575 pipeline_handler,
576 prom_store_with_metric_engine,
577 prom_validation_mode,
578 };
579
580 Self {
581 router: self.router.nest(
582 &format!("/{HTTP_API_VERSION}/prometheus"),
583 HttpServer::route_prom(state),
584 ),
585 ..self
586 }
587 }
588
589 pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
590 Self {
591 router: self.router.nest(
592 &format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
593 HttpServer::route_prometheus(handler),
594 ),
595 ..self
596 }
597 }
598
599 pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
600 Self {
601 router: self.router.nest(
602 &format!("/{HTTP_API_VERSION}/otlp"),
603 HttpServer::route_otlp(handler),
604 ),
605 ..self
606 }
607 }
608
609 pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
610 Self {
611 user_provider: Some(user_provider),
612 ..self
613 }
614 }
615
616 pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
617 Self {
618 router: self.router.merge(HttpServer::route_metrics(handler)),
619 ..self
620 }
621 }
622
623 pub fn with_log_ingest_handler(
624 self,
625 handler: PipelineHandlerRef,
626 validator: Option<LogValidatorRef>,
627 ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
628 ) -> Self {
629 let log_state = LogState {
630 log_handler: handler,
631 log_validator: validator,
632 ingest_interceptor,
633 };
634
635 let router = self.router.nest(
636 &format!("/{HTTP_API_VERSION}"),
637 HttpServer::route_pipelines(log_state.clone()),
638 );
639 let router = router.nest(
641 &format!("/{HTTP_API_VERSION}/events"),
642 #[allow(deprecated)]
643 HttpServer::route_log_deprecated(log_state.clone()),
644 );
645
646 let router = router.nest(
647 &format!("/{HTTP_API_VERSION}/loki"),
648 HttpServer::route_loki(log_state.clone()),
649 );
650
651 let router = router.nest(
652 &format!("/{HTTP_API_VERSION}/elasticsearch"),
653 HttpServer::route_elasticsearch(log_state.clone()),
654 );
655
656 let router = router.nest(
657 &format!("/{HTTP_API_VERSION}/elasticsearch/"),
658 Router::new()
659 .route("/", routing::get(elasticsearch::handle_get_version))
660 .with_state(log_state),
661 );
662
663 Self { router, ..self }
664 }
665
666 pub fn with_plugins(self, plugins: Plugins) -> Self {
667 Self { plugins, ..self }
668 }
669
670 pub fn with_greptime_config_options(self, opts: String) -> Self {
671 let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
672 greptime_config_options: opts,
673 });
674
675 Self {
676 router: self.router.merge(config_router),
677 ..self
678 }
679 }
680
681 pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
682 Self {
683 router: self.router.nest(
684 &format!("/{HTTP_API_VERSION}/jaeger"),
685 HttpServer::route_jaeger(handler),
686 ),
687 ..self
688 }
689 }
690
691 pub fn with_extra_router(self, router: Router) -> Self {
692 Self {
693 router: self.router.merge(router),
694 ..self
695 }
696 }
697
698 pub fn build(self) -> HttpServer {
699 HttpServer {
700 options: self.options,
701 user_provider: self.user_provider,
702 shutdown_tx: Mutex::new(None),
703 plugins: self.plugins,
704 router: StdMutex::new(self.router),
705 bind_addr: None,
706 }
707 }
708}
709
710impl HttpServer {
711 pub fn make_app(&self) -> Router {
713 let mut router = {
714 let router = self.router.lock().unwrap();
715 router.clone()
716 };
717
718 router = router
719 .route(
720 "/health",
721 routing::get(handler::health).post(handler::health),
722 )
723 .route(
724 "/ready",
725 routing::get(handler::health).post(handler::health),
726 );
727
728 router = router.route("/status", routing::get(handler::status));
729
730 #[cfg(feature = "dashboard")]
731 {
732 if !self.options.disable_dashboard {
733 info!("Enable dashboard service at '/dashboard'");
734 router = router.route(
736 "/dashboard",
737 routing::get(|uri: axum::http::uri::Uri| async move {
738 let path = uri.path();
739 let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
740
741 let new_uri = format!("{}/{}", path, query);
742 axum::response::Redirect::permanent(&new_uri)
743 }),
744 );
745
746 router = router
750 .route(
751 "/dashboard/",
752 routing::get(dashboard::static_handler).post(dashboard::static_handler),
753 )
754 .route(
755 "/dashboard/{*x}",
756 routing::get(dashboard::static_handler).post(dashboard::static_handler),
757 );
758 }
759 }
760
761 router = router.route_layer(middleware::from_fn(http_metrics_layer));
763
764 router
765 }
766
767 pub fn build(&self, router: Router) -> Result<Router> {
770 let timeout_layer = if self.options.timeout != Duration::default() {
771 Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
772 } else {
773 info!("HTTP server timeout is disabled");
774 None
775 };
776 let body_limit_layer = if self.options.body_limit != ReadableSize(0) {
777 Some(
778 ServiceBuilder::new()
779 .layer(DefaultBodyLimit::max(self.options.body_limit.0 as usize)),
780 )
781 } else {
782 info!("HTTP server body limit is disabled");
783 None
784 };
785 let cors_layer = if self.options.enable_cors {
786 Some(
787 CorsLayer::new()
788 .allow_methods([
789 Method::GET,
790 Method::POST,
791 Method::PUT,
792 Method::DELETE,
793 Method::HEAD,
794 ])
795 .allow_origin(if self.options.cors_allowed_origins.is_empty() {
796 AllowOrigin::from(Any)
797 } else {
798 AllowOrigin::from(
799 self.options
800 .cors_allowed_origins
801 .iter()
802 .map(|s| {
803 HeaderValue::from_str(s.as_str())
804 .context(InvalidHeaderValueSnafu)
805 })
806 .collect::<Result<Vec<HeaderValue>>>()?,
807 )
808 })
809 .allow_headers(Any),
810 )
811 } else {
812 info!("HTTP server cross-origin is disabled");
813 None
814 };
815
816 Ok(router
817 .layer(
819 ServiceBuilder::new()
820 .layer(TraceLayer::new_for_http().on_failure(()))
823 .option_layer(cors_layer)
824 .option_layer(timeout_layer)
825 .option_layer(body_limit_layer)
826 .layer(middleware::from_fn_with_state(
828 AuthState::new(self.user_provider.clone()),
829 authorize::check_http_auth,
830 ))
831 .layer(middleware::from_fn(hints::extract_hints))
832 .layer(middleware::from_fn(
833 read_preference::extract_read_preference,
834 )),
835 )
836 .nest(
838 "/debug",
839 Router::new()
840 .route("/log_level", routing::post(dyn_log::dyn_log_handler))
842 .nest(
843 "/prof",
844 Router::new()
845 .route("/cpu", routing::post(pprof::pprof_handler))
846 .route("/mem", routing::post(mem_prof::mem_prof_handler)),
847 ),
848 ))
849 }
850
851 fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
852 Router::new()
853 .route("/metrics", routing::get(handler::metrics))
854 .with_state(metrics_handler)
855 }
856
857 fn route_loki<S>(log_state: LogState) -> Router<S> {
858 Router::new()
859 .route("/api/v1/push", routing::post(loki::loki_ingest))
860 .layer(
861 ServiceBuilder::new()
862 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
863 )
864 .with_state(log_state)
865 }
866
867 fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
868 Router::new()
869 .route(
871 "/",
872 routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
873 )
874 .route("/", routing::get(elasticsearch::handle_get_version))
876 .route("/_license", routing::get(elasticsearch::handle_get_license))
878 .route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
879 .route(
880 "/{index}/_bulk",
881 routing::post(elasticsearch::handle_bulk_api_with_index),
882 )
883 .route(
885 "/_ilm/policy/{*path}",
886 routing::any((
887 HttpStatusCode::OK,
888 elasticsearch::elasticsearch_headers(),
889 axum::Json(serde_json::json!({})),
890 )),
891 )
892 .route(
894 "/_index_template/{*path}",
895 routing::any((
896 HttpStatusCode::OK,
897 elasticsearch::elasticsearch_headers(),
898 axum::Json(serde_json::json!({})),
899 )),
900 )
901 .route(
904 "/_ingest/{*path}",
905 routing::any((
906 HttpStatusCode::OK,
907 elasticsearch::elasticsearch_headers(),
908 axum::Json(serde_json::json!({})),
909 )),
910 )
911 .route(
914 "/_nodes/{*path}",
915 routing::any((
916 HttpStatusCode::OK,
917 elasticsearch::elasticsearch_headers(),
918 axum::Json(serde_json::json!({})),
919 )),
920 )
921 .route(
924 "/logstash/{*path}",
925 routing::any((
926 HttpStatusCode::OK,
927 elasticsearch::elasticsearch_headers(),
928 axum::Json(serde_json::json!({})),
929 )),
930 )
931 .route(
932 "/_logstash/{*path}",
933 routing::any((
934 HttpStatusCode::OK,
935 elasticsearch::elasticsearch_headers(),
936 axum::Json(serde_json::json!({})),
937 )),
938 )
939 .layer(ServiceBuilder::new().layer(RequestDecompressionLayer::new()))
940 .with_state(log_state)
941 }
942
943 #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
944 fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
945 Router::new()
946 .route("/logs", routing::post(event::log_ingester))
947 .route(
948 "/pipelines/{pipeline_name}",
949 routing::get(event::query_pipeline),
950 )
951 .route(
952 "/pipelines/{pipeline_name}",
953 routing::post(event::add_pipeline),
954 )
955 .route(
956 "/pipelines/{pipeline_name}",
957 routing::delete(event::delete_pipeline),
958 )
959 .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
960 .layer(
961 ServiceBuilder::new()
962 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
963 )
964 .with_state(log_state)
965 }
966
967 fn route_pipelines<S>(log_state: LogState) -> Router<S> {
968 Router::new()
969 .route("/ingest", routing::post(event::log_ingester))
970 .route(
971 "/pipelines/{pipeline_name}",
972 routing::get(event::query_pipeline),
973 )
974 .route(
975 "/pipelines/{pipeline_name}",
976 routing::post(event::add_pipeline),
977 )
978 .route(
979 "/pipelines/{pipeline_name}",
980 routing::delete(event::delete_pipeline),
981 )
982 .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
983 .layer(
984 ServiceBuilder::new()
985 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
986 )
987 .with_state(log_state)
988 }
989
990 fn route_sql<S>(api_state: ApiState) -> Router<S> {
991 Router::new()
992 .route("/sql", routing::get(handler::sql).post(handler::sql))
993 .route(
994 "/sql/parse",
995 routing::get(handler::sql_parse).post(handler::sql_parse),
996 )
997 .route(
998 "/promql",
999 routing::get(handler::promql).post(handler::promql),
1000 )
1001 .with_state(api_state)
1002 }
1003
1004 fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
1005 Router::new()
1006 .route("/logs", routing::get(logs::logs).post(logs::logs))
1007 .with_state(log_handler)
1008 }
1009
1010 fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
1014 Router::new()
1015 .route(
1016 "/format_query",
1017 routing::post(format_query).get(format_query),
1018 )
1019 .route("/status/buildinfo", routing::get(build_info_query))
1020 .route("/query", routing::post(instant_query).get(instant_query))
1021 .route("/query_range", routing::post(range_query).get(range_query))
1022 .route("/labels", routing::post(labels_query).get(labels_query))
1023 .route("/series", routing::post(series_query).get(series_query))
1024 .route("/parse_query", routing::post(parse_query).get(parse_query))
1025 .route(
1026 "/label/{label_name}/values",
1027 routing::get(label_values_query),
1028 )
1029 .layer(ServiceBuilder::new().layer(CompressionLayer::new()))
1030 .with_state(prometheus_handler)
1031 }
1032
1033 fn route_prom<S>(state: PromStoreState) -> Router<S> {
1039 Router::new()
1040 .route("/read", routing::post(prom_store::remote_read))
1041 .route("/write", routing::post(prom_store::remote_write))
1042 .with_state(state)
1043 }
1044
1045 fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
1046 Router::new()
1047 .route("/write", routing::post(influxdb_write_v1))
1048 .route("/api/v2/write", routing::post(influxdb_write_v2))
1049 .layer(
1050 ServiceBuilder::new()
1051 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1052 )
1053 .route("/ping", routing::get(influxdb_ping))
1054 .route("/health", routing::get(influxdb_health))
1055 .with_state(influxdb_handler)
1056 }
1057
1058 fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
1059 Router::new()
1060 .route("/api/put", routing::post(opentsdb::put))
1061 .with_state(opentsdb_handler)
1062 }
1063
1064 fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
1065 Router::new()
1066 .route("/v1/metrics", routing::post(otlp::metrics))
1067 .route("/v1/traces", routing::post(otlp::traces))
1068 .route("/v1/logs", routing::post(otlp::logs))
1069 .layer(
1070 ServiceBuilder::new()
1071 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1072 )
1073 .with_state(otlp_handler)
1074 }
1075
1076 fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
1077 Router::new()
1078 .route("/config", routing::get(handler::config))
1079 .with_state(state)
1080 }
1081
1082 fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
1083 Router::new()
1084 .route("/api/services", routing::get(jaeger::handle_get_services))
1085 .route(
1086 "/api/services/{service_name}/operations",
1087 routing::get(jaeger::handle_get_operations_by_service),
1088 )
1089 .route(
1090 "/api/operations",
1091 routing::get(jaeger::handle_get_operations),
1092 )
1093 .route("/api/traces", routing::get(jaeger::handle_find_traces))
1094 .route(
1095 "/api/traces/{trace_id}",
1096 routing::get(jaeger::handle_get_trace),
1097 )
1098 .with_state(handler)
1099 }
1100}
1101
1102pub const HTTP_SERVER: &str = "HTTP_SERVER";
1103
1104#[async_trait]
1105impl Server for HttpServer {
1106 async fn shutdown(&self) -> Result<()> {
1107 let mut shutdown_tx = self.shutdown_tx.lock().await;
1108 if let Some(tx) = shutdown_tx.take() {
1109 if tx.send(()).is_err() {
1110 info!("Receiver dropped, the HTTP server has already existed");
1111 }
1112 }
1113 info!("Shutdown HTTP server");
1114
1115 Ok(())
1116 }
1117
1118 async fn start(&mut self, listening: SocketAddr) -> Result<()> {
1119 let (tx, rx) = oneshot::channel();
1120 let serve = {
1121 let mut shutdown_tx = self.shutdown_tx.lock().await;
1122 ensure!(
1123 shutdown_tx.is_none(),
1124 AlreadyStartedSnafu { server: "HTTP" }
1125 );
1126
1127 let mut app = self.make_app();
1128 if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
1129 app = configurator.config_http(app);
1130 }
1131 let app = self.build(app)?;
1132 let listener = tokio::net::TcpListener::bind(listening)
1133 .await
1134 .context(AddressBindSnafu { addr: listening })?
1135 .tap_io(|tcp_stream| {
1136 if let Err(e) = tcp_stream.set_nodelay(true) {
1137 error!(e; "Failed to set TCP_NODELAY on incoming connection");
1138 }
1139 });
1140 let serve = axum::serve(listener, app.into_make_service());
1141
1142 *shutdown_tx = Some(tx);
1159
1160 serve
1161 };
1162 let listening = serve.local_addr().context(InternalIoSnafu)?;
1163 info!("HTTP server is bound to {}", listening);
1164
1165 common_runtime::spawn_global(async move {
1166 if let Err(e) = serve
1167 .with_graceful_shutdown(rx.map(drop))
1168 .await
1169 .context(InternalIoSnafu)
1170 {
1171 error!(e; "Failed to shutdown http server");
1172 }
1173 });
1174
1175 self.bind_addr = Some(listening);
1176 Ok(())
1177 }
1178
1179 fn name(&self) -> &str {
1180 HTTP_SERVER
1181 }
1182
1183 fn bind_addr(&self) -> Option<SocketAddr> {
1184 self.bind_addr
1185 }
1186}
1187
1188#[cfg(test)]
1189mod test {
1190 use std::future::pending;
1191 use std::io::Cursor;
1192 use std::sync::Arc;
1193
1194 use arrow_ipc::reader::FileReader;
1195 use arrow_schema::DataType;
1196 use axum::handler::Handler;
1197 use axum::http::StatusCode;
1198 use axum::routing::get;
1199 use common_query::Output;
1200 use common_recordbatch::RecordBatches;
1201 use datafusion_expr::LogicalPlan;
1202 use datatypes::prelude::*;
1203 use datatypes::schema::{ColumnSchema, Schema};
1204 use datatypes::vectors::{StringVector, UInt32Vector};
1205 use header::constants::GREPTIME_DB_HEADER_TIMEOUT;
1206 use query::parser::PromQuery;
1207 use query::query_engine::DescribeResult;
1208 use session::context::QueryContextRef;
1209 use tokio::sync::mpsc;
1210 use tokio::time::Instant;
1211
1212 use super::*;
1213 use crate::error::Error;
1214 use crate::http::test_helpers::TestClient;
1215 use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
1216
1217 struct DummyInstance {
1218 _tx: mpsc::Sender<(String, Vec<u8>)>,
1219 }
1220
1221 #[async_trait]
1222 impl SqlQueryHandler for DummyInstance {
1223 type Error = Error;
1224
1225 async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
1226 unimplemented!()
1227 }
1228
1229 async fn do_promql_query(
1230 &self,
1231 _: &PromQuery,
1232 _: QueryContextRef,
1233 ) -> Vec<std::result::Result<Output, Self::Error>> {
1234 unimplemented!()
1235 }
1236
1237 async fn do_exec_plan(
1238 &self,
1239 _plan: LogicalPlan,
1240 _query_ctx: QueryContextRef,
1241 ) -> std::result::Result<Output, Self::Error> {
1242 unimplemented!()
1243 }
1244
1245 async fn do_describe(
1246 &self,
1247 _stmt: sql::statements::statement::Statement,
1248 _query_ctx: QueryContextRef,
1249 ) -> Result<Option<DescribeResult>> {
1250 unimplemented!()
1251 }
1252
1253 async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
1254 Ok(true)
1255 }
1256 }
1257
1258 fn timeout() -> DynamicTimeoutLayer {
1259 DynamicTimeoutLayer::new(Duration::from_millis(10))
1260 }
1261
1262 async fn forever() {
1263 pending().await
1264 }
1265
1266 fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
1267 make_test_app_custom(tx, HttpOptions::default())
1268 }
1269
1270 fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
1271 let instance = Arc::new(DummyInstance { _tx: tx });
1272 let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
1273 let server = HttpServerBuilder::new(options)
1274 .with_sql_handler(sql_instance)
1275 .build();
1276 server.build(server.make_app()).unwrap().route(
1277 "/test/timeout",
1278 get(forever.layer(ServiceBuilder::new().layer(timeout()))),
1279 )
1280 }
1281
1282 #[tokio::test]
1283 pub async fn test_cors() {
1284 let (tx, _rx) = mpsc::channel(100);
1286 let app = make_test_app(tx);
1287 let client = TestClient::new(app).await;
1288
1289 let res = client.get("/health").send().await;
1290
1291 assert_eq!(res.status(), StatusCode::OK);
1292 assert_eq!(
1293 res.headers()
1294 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1295 .expect("expect cors header origin"),
1296 "*"
1297 );
1298
1299 let res = client
1300 .options("/health")
1301 .header("Access-Control-Request-Headers", "x-greptime-auth")
1302 .header("Access-Control-Request-Method", "DELETE")
1303 .header("Origin", "https://example.com")
1304 .send()
1305 .await;
1306 assert_eq!(res.status(), StatusCode::OK);
1307 assert_eq!(
1308 res.headers()
1309 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1310 .expect("expect cors header origin"),
1311 "*"
1312 );
1313 assert_eq!(
1314 res.headers()
1315 .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
1316 .expect("expect cors header headers"),
1317 "*"
1318 );
1319 assert_eq!(
1320 res.headers()
1321 .get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
1322 .expect("expect cors header methods"),
1323 "GET,POST,PUT,DELETE,HEAD"
1324 );
1325 }
1326
1327 #[tokio::test]
1328 pub async fn test_cors_custom_origins() {
1329 let (tx, _rx) = mpsc::channel(100);
1331 let origin = "https://example.com";
1332
1333 let options = HttpOptions {
1334 cors_allowed_origins: vec![origin.to_string()],
1335 ..Default::default()
1336 };
1337
1338 let app = make_test_app_custom(tx, options);
1339 let client = TestClient::new(app).await;
1340
1341 let res = client.get("/health").header("Origin", origin).send().await;
1342
1343 assert_eq!(res.status(), StatusCode::OK);
1344 assert_eq!(
1345 res.headers()
1346 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1347 .expect("expect cors header origin"),
1348 origin
1349 );
1350
1351 let res = client
1352 .get("/health")
1353 .header("Origin", "https://notallowed.com")
1354 .send()
1355 .await;
1356
1357 assert_eq!(res.status(), StatusCode::OK);
1358 assert!(!res
1359 .headers()
1360 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1361 }
1362
1363 #[tokio::test]
1364 pub async fn test_cors_disabled() {
1365 let (tx, _rx) = mpsc::channel(100);
1367
1368 let options = HttpOptions {
1369 enable_cors: false,
1370 ..Default::default()
1371 };
1372
1373 let app = make_test_app_custom(tx, options);
1374 let client = TestClient::new(app).await;
1375
1376 let res = client.get("/health").send().await;
1377
1378 assert_eq!(res.status(), StatusCode::OK);
1379 assert!(!res
1380 .headers()
1381 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1382 }
1383
1384 #[test]
1385 fn test_http_options_default() {
1386 let default = HttpOptions::default();
1387 assert_eq!("127.0.0.1:4000".to_string(), default.addr);
1388 assert_eq!(Duration::from_secs(0), default.timeout)
1389 }
1390
1391 #[tokio::test]
1392 async fn test_http_server_request_timeout() {
1393 common_telemetry::init_default_ut_logging();
1394
1395 let (tx, _rx) = mpsc::channel(100);
1396 let app = make_test_app(tx);
1397 let client = TestClient::new(app).await;
1398 let res = client.get("/test/timeout").send().await;
1399 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1400
1401 let now = Instant::now();
1402 let res = client
1403 .get("/test/timeout")
1404 .header(GREPTIME_DB_HEADER_TIMEOUT, "20ms")
1405 .send()
1406 .await;
1407 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1408 let elapsed = now.elapsed();
1409 assert!(elapsed > Duration::from_millis(15));
1410
1411 tokio::time::timeout(
1412 Duration::from_millis(15),
1413 client
1414 .get("/test/timeout")
1415 .header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
1416 .send(),
1417 )
1418 .await
1419 .unwrap_err();
1420
1421 tokio::time::timeout(
1422 Duration::from_millis(15),
1423 client
1424 .get("/test/timeout")
1425 .header(
1426 GREPTIME_DB_HEADER_TIMEOUT,
1427 humantime::format_duration(Duration::default()).to_string(),
1428 )
1429 .send(),
1430 )
1431 .await
1432 .unwrap_err();
1433 }
1434
1435 #[tokio::test]
1436 async fn test_schema_for_empty_response() {
1437 let column_schemas = vec![
1438 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1439 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1440 ];
1441 let schema = Arc::new(Schema::new(column_schemas));
1442
1443 let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
1444 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1445
1446 let json_resp = GreptimedbV1Response::from_output(outputs).await;
1447 if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
1448 let json_output = &json_resp.output[0];
1449 if let GreptimeQueryOutput::Records(r) = json_output {
1450 assert_eq!(r.num_rows(), 0);
1451 assert_eq!(r.num_cols(), 2);
1452 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1453 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1454 } else {
1455 panic!("invalid output type");
1456 }
1457 } else {
1458 panic!("invalid format")
1459 }
1460 }
1461
1462 #[tokio::test]
1463 async fn test_recordbatches_conversion() {
1464 let column_schemas = vec![
1465 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1466 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1467 ];
1468 let schema = Arc::new(Schema::new(column_schemas));
1469 let columns: Vec<VectorRef> = vec![
1470 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
1471 Arc::new(StringVector::from(vec![
1472 None,
1473 Some("hello"),
1474 Some("greptime"),
1475 None,
1476 ])),
1477 ];
1478 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
1479
1480 for format in [
1481 ResponseFormat::GreptimedbV1,
1482 ResponseFormat::InfluxdbV1,
1483 ResponseFormat::Csv,
1484 ResponseFormat::Table,
1485 ResponseFormat::Arrow,
1486 ResponseFormat::Json,
1487 ] {
1488 let recordbatches =
1489 RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
1490 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1491 let json_resp = match format {
1492 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
1493 ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
1494 ResponseFormat::Table => TableResponse::from_output(outputs).await,
1495 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
1496 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
1497 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
1498 };
1499
1500 match json_resp {
1501 HttpResponse::GreptimedbV1(resp) => {
1502 let json_output = &resp.output[0];
1503 if let GreptimeQueryOutput::Records(r) = json_output {
1504 assert_eq!(r.num_rows(), 4);
1505 assert_eq!(r.num_cols(), 2);
1506 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1507 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1508 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1509 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1510 } else {
1511 panic!("invalid output type");
1512 }
1513 }
1514 HttpResponse::InfluxdbV1(resp) => {
1515 let json_output = &resp.results()[0];
1516 assert_eq!(json_output.num_rows(), 4);
1517 assert_eq!(json_output.num_cols(), 2);
1518 assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
1519 assert_eq!(
1520 json_output.series[0].values[0][0],
1521 serde_json::Value::from(1)
1522 );
1523 assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
1524 }
1525 HttpResponse::Csv(resp) => {
1526 let output = &resp.output()[0];
1527 if let GreptimeQueryOutput::Records(r) = output {
1528 assert_eq!(r.num_rows(), 4);
1529 assert_eq!(r.num_cols(), 2);
1530 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1531 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1532 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1533 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1534 } else {
1535 panic!("invalid output type");
1536 }
1537 }
1538
1539 HttpResponse::Table(resp) => {
1540 let output = &resp.output()[0];
1541 if let GreptimeQueryOutput::Records(r) = output {
1542 assert_eq!(r.num_rows(), 4);
1543 assert_eq!(r.num_cols(), 2);
1544 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1545 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1546 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1547 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1548 } else {
1549 panic!("invalid output type");
1550 }
1551 }
1552
1553 HttpResponse::Arrow(resp) => {
1554 let output = resp.data;
1555 let mut reader =
1556 FileReader::try_new(Cursor::new(output), None).expect("Arrow reader error");
1557 let schema = reader.schema();
1558 assert_eq!(schema.fields[0].name(), "numbers");
1559 assert_eq!(schema.fields[0].data_type(), &DataType::UInt32);
1560 assert_eq!(schema.fields[1].name(), "strings");
1561 assert_eq!(schema.fields[1].data_type(), &DataType::Utf8);
1562
1563 let rb = reader.next().unwrap().expect("read record batch failed");
1564 assert_eq!(rb.num_columns(), 2);
1565 assert_eq!(rb.num_rows(), 4);
1566 }
1567
1568 HttpResponse::Json(resp) => {
1569 let output = &resp.output()[0];
1570 if let GreptimeQueryOutput::Records(r) = output {
1571 assert_eq!(r.num_rows(), 4);
1572 assert_eq!(r.num_cols(), 2);
1573 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1574 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1575 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1576 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1577 } else {
1578 panic!("invalid output type");
1579 }
1580 }
1581
1582 HttpResponse::Error(err) => unreachable!("{err:?}"),
1583 }
1584 }
1585 }
1586}