1use std::collections::HashMap;
16use std::fmt::Display;
17use std::net::SocketAddr;
18use std::sync::Mutex as StdMutex;
19use std::time::Duration;
20
21use async_trait::async_trait;
22use auth::UserProviderRef;
23use axum::extract::DefaultBodyLimit;
24use axum::http::StatusCode as HttpStatusCode;
25use axum::response::{IntoResponse, Response};
26use axum::serve::ListenerExt;
27use axum::{middleware, routing, Router};
28use common_base::readable_size::ReadableSize;
29use common_base::Plugins;
30use common_recordbatch::RecordBatch;
31use common_telemetry::{error, info};
32use common_time::timestamp::TimeUnit;
33use common_time::Timestamp;
34use datatypes::data_type::DataType;
35use datatypes::schema::SchemaRef;
36use datatypes::value::transform_value_ref_to_json_value;
37use event::{LogState, LogValidatorRef};
38use futures::FutureExt;
39use http::{HeaderValue, Method};
40use serde::{Deserialize, Serialize};
41use serde_json::Value;
42use snafu::{ensure, ResultExt};
43use tokio::sync::oneshot::{self, Sender};
44use tokio::sync::Mutex;
45use tower::ServiceBuilder;
46use tower_http::compression::CompressionLayer;
47use tower_http::cors::{AllowOrigin, Any, CorsLayer};
48use tower_http::decompression::RequestDecompressionLayer;
49use tower_http::trace::TraceLayer;
50
51use self::authorize::AuthState;
52use self::result::table_result::TableResponse;
53use crate::configurator::ConfiguratorRef;
54use crate::elasticsearch;
55use crate::error::{
56 AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
57 ToJsonSnafu,
58};
59use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
60use crate::http::prom_store::PromStoreState;
61use crate::http::prometheus::{
62 build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
63 range_query, series_query,
64};
65use crate::http::result::arrow_result::ArrowResponse;
66use crate::http::result::csv_result::CsvResponse;
67use crate::http::result::error_result::ErrorResponse;
68use crate::http::result::greptime_result_v1::GreptimedbV1Response;
69use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
70use crate::http::result::json_result::JsonResponse;
71use crate::interceptor::LogIngestInterceptorRef;
72use crate::metrics::http_metrics_layer;
73use crate::metrics_handler::MetricsHandler;
74use crate::prometheus_handler::PrometheusHandlerRef;
75use crate::query_handler::sql::ServerSqlQueryHandlerRef;
76use crate::query_handler::{
77 InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
78 OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
79 PromStoreProtocolHandlerRef,
80};
81use crate::server::Server;
82
83pub mod authorize;
84#[cfg(feature = "dashboard")]
85mod dashboard;
86pub mod dyn_log;
87pub mod event;
88mod extractor;
89pub mod handler;
90pub mod header;
91pub mod influxdb;
92pub mod jaeger;
93pub mod logs;
94pub mod loki;
95pub mod mem_prof;
96pub mod opentsdb;
97pub mod otlp;
98pub mod pprof;
99pub mod prom_store;
100pub mod prometheus;
101pub mod result;
102mod timeout;
103
104pub(crate) use timeout::DynamicTimeoutLayer;
105
106mod hints;
107mod read_preference;
108#[cfg(any(test, feature = "testing"))]
109pub mod test_helpers;
110
111pub const HTTP_API_VERSION: &str = "v1";
112pub const HTTP_API_PREFIX: &str = "/v1/";
113const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
115
116pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
118
119pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
121
122#[derive(Default)]
123pub struct HttpServer {
124 router: StdMutex<Router>,
125 shutdown_tx: Mutex<Option<Sender<()>>>,
126 user_provider: Option<UserProviderRef>,
127
128 plugins: Plugins,
130
131 options: HttpOptions,
133 bind_addr: Option<SocketAddr>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(default)]
138pub struct HttpOptions {
139 pub addr: String,
140
141 #[serde(with = "humantime_serde")]
142 pub timeout: Duration,
143
144 #[serde(skip)]
145 pub disable_dashboard: bool,
146
147 pub body_limit: ReadableSize,
148
149 pub is_strict_mode: bool,
150
151 pub cors_allowed_origins: Vec<String>,
152
153 pub enable_cors: bool,
154}
155
156impl Default for HttpOptions {
157 fn default() -> Self {
158 Self {
159 addr: "127.0.0.1:4000".to_string(),
160 timeout: Duration::from_secs(0),
161 disable_dashboard: false,
162 body_limit: DEFAULT_BODY_LIMIT,
163 is_strict_mode: false,
164 cors_allowed_origins: Vec::new(),
165 enable_cors: true,
166 }
167 }
168}
169
170#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
171pub struct ColumnSchema {
172 name: String,
173 data_type: String,
174}
175
176impl ColumnSchema {
177 pub fn new(name: String, data_type: String) -> ColumnSchema {
178 ColumnSchema { name, data_type }
179 }
180}
181
182#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
183pub struct OutputSchema {
184 column_schemas: Vec<ColumnSchema>,
185}
186
187impl OutputSchema {
188 pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
189 OutputSchema {
190 column_schemas: columns,
191 }
192 }
193}
194
195impl From<SchemaRef> for OutputSchema {
196 fn from(schema: SchemaRef) -> OutputSchema {
197 OutputSchema {
198 column_schemas: schema
199 .column_schemas()
200 .iter()
201 .map(|cs| ColumnSchema {
202 name: cs.name.clone(),
203 data_type: cs.data_type.name(),
204 })
205 .collect(),
206 }
207 }
208}
209
210#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
211pub struct HttpRecordsOutput {
212 schema: OutputSchema,
213 rows: Vec<Vec<Value>>,
214 #[serde(default)]
217 total_rows: usize,
218
219 #[serde(skip_serializing_if = "HashMap::is_empty")]
221 #[serde(default)]
222 metrics: HashMap<String, Value>,
223}
224
225impl HttpRecordsOutput {
226 pub fn num_rows(&self) -> usize {
227 self.rows.len()
228 }
229
230 pub fn num_cols(&self) -> usize {
231 self.schema.column_schemas.len()
232 }
233
234 pub fn schema(&self) -> &OutputSchema {
235 &self.schema
236 }
237
238 pub fn rows(&self) -> &Vec<Vec<Value>> {
239 &self.rows
240 }
241}
242
243impl HttpRecordsOutput {
244 pub fn try_new(
245 schema: SchemaRef,
246 recordbatches: Vec<RecordBatch>,
247 ) -> std::result::Result<HttpRecordsOutput, Error> {
248 if recordbatches.is_empty() {
249 Ok(HttpRecordsOutput {
250 schema: OutputSchema::from(schema),
251 rows: vec![],
252 total_rows: 0,
253 metrics: Default::default(),
254 })
255 } else {
256 let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
257 let mut rows = Vec::with_capacity(num_rows);
258 let schemas = schema.column_schemas();
259 let num_cols = schema.column_schemas().len();
260 rows.resize_with(num_rows, || Vec::with_capacity(num_cols));
261
262 let mut finished_row_cursor = 0;
263 for recordbatch in recordbatches {
264 for (col_idx, col) in recordbatch.columns().iter().enumerate() {
265 let schema = &schemas[col_idx];
267 for row_idx in 0..recordbatch.num_rows() {
268 let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema)
269 .context(ToJsonSnafu)?;
270 rows[row_idx + finished_row_cursor].push(value);
271 }
272 }
273 finished_row_cursor += recordbatch.num_rows();
274 }
275
276 Ok(HttpRecordsOutput {
277 schema: OutputSchema::from(schema),
278 total_rows: rows.len(),
279 rows,
280 metrics: Default::default(),
281 })
282 }
283 }
284}
285
286#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
287#[serde(rename_all = "lowercase")]
288pub enum GreptimeQueryOutput {
289 AffectedRows(usize),
290 Records(HttpRecordsOutput),
291}
292
293#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
295pub enum ResponseFormat {
296 Arrow,
297 Csv,
298 Table,
299 #[default]
300 GreptimedbV1,
301 InfluxdbV1,
302 Json,
303}
304
305impl ResponseFormat {
306 pub fn parse(s: &str) -> Option<Self> {
307 match s {
308 "arrow" => Some(ResponseFormat::Arrow),
309 "csv" => Some(ResponseFormat::Csv),
310 "table" => Some(ResponseFormat::Table),
311 "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
312 "influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
313 "json" => Some(ResponseFormat::Json),
314 _ => None,
315 }
316 }
317
318 pub fn as_str(&self) -> &'static str {
319 match self {
320 ResponseFormat::Arrow => "arrow",
321 ResponseFormat::Csv => "csv",
322 ResponseFormat::Table => "table",
323 ResponseFormat::GreptimedbV1 => "greptimedb_v1",
324 ResponseFormat::InfluxdbV1 => "influxdb_v1",
325 ResponseFormat::Json => "json",
326 }
327 }
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub enum Epoch {
332 Nanosecond,
333 Microsecond,
334 Millisecond,
335 Second,
336}
337
338impl Epoch {
339 pub fn parse(s: &str) -> Option<Epoch> {
340 match s {
345 "ns" => Some(Epoch::Nanosecond),
346 "u" | "µ" => Some(Epoch::Microsecond),
347 "ms" => Some(Epoch::Millisecond),
348 "s" => Some(Epoch::Second),
349 _ => None, }
351 }
352
353 pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
354 match self {
355 Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
356 Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
357 Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
358 Epoch::Second => ts.convert_to(TimeUnit::Second),
359 }
360 }
361}
362
363impl Display for Epoch {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 match self {
366 Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
367 Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
368 Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
369 Epoch::Second => write!(f, "Epoch::Second"),
370 }
371 }
372}
373
374#[derive(Serialize, Deserialize, Debug)]
375pub enum HttpResponse {
376 Arrow(ArrowResponse),
377 Csv(CsvResponse),
378 Table(TableResponse),
379 Error(ErrorResponse),
380 GreptimedbV1(GreptimedbV1Response),
381 InfluxdbV1(InfluxdbV1Response),
382 Json(JsonResponse),
383}
384
385impl HttpResponse {
386 pub fn with_execution_time(self, execution_time: u64) -> Self {
387 match self {
388 HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
389 HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
390 HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
391 HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
392 HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
393 HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
394 HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
395 }
396 }
397
398 pub fn with_limit(self, limit: usize) -> Self {
399 match self {
400 HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
401 HttpResponse::Table(resp) => resp.with_limit(limit).into(),
402 HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
403 HttpResponse::Json(resp) => resp.with_limit(limit).into(),
404 _ => self,
405 }
406 }
407}
408
409pub fn process_with_limit(
410 mut outputs: Vec<GreptimeQueryOutput>,
411 limit: usize,
412) -> Vec<GreptimeQueryOutput> {
413 outputs
414 .drain(..)
415 .map(|data| match data {
416 GreptimeQueryOutput::Records(mut records) => {
417 if records.rows.len() > limit {
418 records.rows.truncate(limit);
419 records.total_rows = limit;
420 }
421 GreptimeQueryOutput::Records(records)
422 }
423 _ => data,
424 })
425 .collect()
426}
427
428impl IntoResponse for HttpResponse {
429 fn into_response(self) -> Response {
430 match self {
431 HttpResponse::Arrow(resp) => resp.into_response(),
432 HttpResponse::Csv(resp) => resp.into_response(),
433 HttpResponse::Table(resp) => resp.into_response(),
434 HttpResponse::GreptimedbV1(resp) => resp.into_response(),
435 HttpResponse::InfluxdbV1(resp) => resp.into_response(),
436 HttpResponse::Json(resp) => resp.into_response(),
437 HttpResponse::Error(resp) => resp.into_response(),
438 }
439 }
440}
441
442impl From<ArrowResponse> for HttpResponse {
443 fn from(value: ArrowResponse) -> Self {
444 HttpResponse::Arrow(value)
445 }
446}
447
448impl From<CsvResponse> for HttpResponse {
449 fn from(value: CsvResponse) -> Self {
450 HttpResponse::Csv(value)
451 }
452}
453
454impl From<TableResponse> for HttpResponse {
455 fn from(value: TableResponse) -> Self {
456 HttpResponse::Table(value)
457 }
458}
459
460impl From<ErrorResponse> for HttpResponse {
461 fn from(value: ErrorResponse) -> Self {
462 HttpResponse::Error(value)
463 }
464}
465
466impl From<GreptimedbV1Response> for HttpResponse {
467 fn from(value: GreptimedbV1Response) -> Self {
468 HttpResponse::GreptimedbV1(value)
469 }
470}
471
472impl From<InfluxdbV1Response> for HttpResponse {
473 fn from(value: InfluxdbV1Response) -> Self {
474 HttpResponse::InfluxdbV1(value)
475 }
476}
477
478impl From<JsonResponse> for HttpResponse {
479 fn from(value: JsonResponse) -> Self {
480 HttpResponse::Json(value)
481 }
482}
483
484#[derive(Clone)]
485pub struct ApiState {
486 pub sql_handler: ServerSqlQueryHandlerRef,
487}
488
489#[derive(Clone)]
490pub struct GreptimeOptionsConfigState {
491 pub greptime_config_options: String,
492}
493
494#[derive(Default)]
495pub struct HttpServerBuilder {
496 options: HttpOptions,
497 plugins: Plugins,
498 user_provider: Option<UserProviderRef>,
499 router: Router,
500}
501
502impl HttpServerBuilder {
503 pub fn new(options: HttpOptions) -> Self {
504 Self {
505 options,
506 plugins: Plugins::default(),
507 user_provider: None,
508 router: Router::new(),
509 }
510 }
511
512 pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
513 let sql_router = HttpServer::route_sql(ApiState { sql_handler });
514
515 Self {
516 router: self
517 .router
518 .nest(&format!("/{HTTP_API_VERSION}"), sql_router),
519 ..self
520 }
521 }
522
523 pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
524 let logs_router = HttpServer::route_logs(logs_handler);
525
526 Self {
527 router: self
528 .router
529 .nest(&format!("/{HTTP_API_VERSION}"), logs_router),
530 ..self
531 }
532 }
533
534 pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
535 Self {
536 router: self.router.nest(
537 &format!("/{HTTP_API_VERSION}/opentsdb"),
538 HttpServer::route_opentsdb(handler),
539 ),
540 ..self
541 }
542 }
543
544 pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
545 Self {
546 router: self.router.nest(
547 &format!("/{HTTP_API_VERSION}/influxdb"),
548 HttpServer::route_influxdb(handler),
549 ),
550 ..self
551 }
552 }
553
554 pub fn with_prom_handler(
555 self,
556 handler: PromStoreProtocolHandlerRef,
557 prom_store_with_metric_engine: bool,
558 is_strict_mode: bool,
559 ) -> Self {
560 let state = PromStoreState {
561 prom_store_handler: handler,
562 prom_store_with_metric_engine,
563 is_strict_mode,
564 };
565
566 Self {
567 router: self.router.nest(
568 &format!("/{HTTP_API_VERSION}/prometheus"),
569 HttpServer::route_prom(state),
570 ),
571 ..self
572 }
573 }
574
575 pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
576 Self {
577 router: self.router.nest(
578 &format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
579 HttpServer::route_prometheus(handler),
580 ),
581 ..self
582 }
583 }
584
585 pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
586 Self {
587 router: self.router.nest(
588 &format!("/{HTTP_API_VERSION}/otlp"),
589 HttpServer::route_otlp(handler),
590 ),
591 ..self
592 }
593 }
594
595 pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
596 Self {
597 user_provider: Some(user_provider),
598 ..self
599 }
600 }
601
602 pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
603 Self {
604 router: self.router.merge(HttpServer::route_metrics(handler)),
605 ..self
606 }
607 }
608
609 pub fn with_log_ingest_handler(
610 self,
611 handler: PipelineHandlerRef,
612 validator: Option<LogValidatorRef>,
613 ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
614 ) -> Self {
615 let log_state = LogState {
616 log_handler: handler,
617 log_validator: validator,
618 ingest_interceptor,
619 };
620
621 let router = self.router.nest(
622 &format!("/{HTTP_API_VERSION}"),
623 HttpServer::route_pipelines(log_state.clone()),
624 );
625 let router = router.nest(
627 &format!("/{HTTP_API_VERSION}/events"),
628 #[allow(deprecated)]
629 HttpServer::route_log_deprecated(log_state.clone()),
630 );
631
632 let router = router.nest(
633 &format!("/{HTTP_API_VERSION}/loki"),
634 HttpServer::route_loki(log_state.clone()),
635 );
636
637 let router = router.nest(
638 &format!("/{HTTP_API_VERSION}/elasticsearch"),
639 HttpServer::route_elasticsearch(log_state.clone()),
640 );
641
642 let router = router.nest(
643 &format!("/{HTTP_API_VERSION}/elasticsearch/"),
644 Router::new()
645 .route("/", routing::get(elasticsearch::handle_get_version))
646 .with_state(log_state),
647 );
648
649 Self { router, ..self }
650 }
651
652 pub fn with_plugins(self, plugins: Plugins) -> Self {
653 Self { plugins, ..self }
654 }
655
656 pub fn with_greptime_config_options(self, opts: String) -> Self {
657 let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
658 greptime_config_options: opts,
659 });
660
661 Self {
662 router: self.router.merge(config_router),
663 ..self
664 }
665 }
666
667 pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
668 Self {
669 router: self.router.nest(
670 &format!("/{HTTP_API_VERSION}/jaeger"),
671 HttpServer::route_jaeger(handler),
672 ),
673 ..self
674 }
675 }
676
677 pub fn with_extra_router(self, router: Router) -> Self {
678 Self {
679 router: self.router.merge(router),
680 ..self
681 }
682 }
683
684 pub fn build(self) -> HttpServer {
685 HttpServer {
686 options: self.options,
687 user_provider: self.user_provider,
688 shutdown_tx: Mutex::new(None),
689 plugins: self.plugins,
690 router: StdMutex::new(self.router),
691 bind_addr: None,
692 }
693 }
694}
695
696impl HttpServer {
697 pub fn make_app(&self) -> Router {
699 let mut router = {
700 let router = self.router.lock().unwrap();
701 router.clone()
702 };
703
704 router = router
705 .route(
706 "/health",
707 routing::get(handler::health).post(handler::health),
708 )
709 .route(
710 "/ready",
711 routing::get(handler::health).post(handler::health),
712 );
713
714 router = router.route("/status", routing::get(handler::status));
715
716 #[cfg(feature = "dashboard")]
717 {
718 if !self.options.disable_dashboard {
719 info!("Enable dashboard service at '/dashboard'");
720 router = router.route(
722 "/dashboard",
723 routing::get(|uri: axum::http::uri::Uri| async move {
724 let path = uri.path();
725 let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
726
727 let new_uri = format!("{}/{}", path, query);
728 axum::response::Redirect::permanent(&new_uri)
729 }),
730 );
731
732 router = router
736 .route(
737 "/dashboard/",
738 routing::get(dashboard::static_handler).post(dashboard::static_handler),
739 )
740 .route(
741 "/dashboard/{*x}",
742 routing::get(dashboard::static_handler).post(dashboard::static_handler),
743 );
744 }
745 }
746
747 router = router.route_layer(middleware::from_fn(http_metrics_layer));
749
750 router
751 }
752
753 pub fn build(&self, router: Router) -> Result<Router> {
756 let timeout_layer = if self.options.timeout != Duration::default() {
757 Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
758 } else {
759 info!("HTTP server timeout is disabled");
760 None
761 };
762 let body_limit_layer = if self.options.body_limit != ReadableSize(0) {
763 Some(
764 ServiceBuilder::new()
765 .layer(DefaultBodyLimit::max(self.options.body_limit.0 as usize)),
766 )
767 } else {
768 info!("HTTP server body limit is disabled");
769 None
770 };
771 let cors_layer = if self.options.enable_cors {
772 Some(
773 CorsLayer::new()
774 .allow_methods([
775 Method::GET,
776 Method::POST,
777 Method::PUT,
778 Method::DELETE,
779 Method::HEAD,
780 ])
781 .allow_origin(if self.options.cors_allowed_origins.is_empty() {
782 AllowOrigin::from(Any)
783 } else {
784 AllowOrigin::from(
785 self.options
786 .cors_allowed_origins
787 .iter()
788 .map(|s| {
789 HeaderValue::from_str(s.as_str())
790 .context(InvalidHeaderValueSnafu)
791 })
792 .collect::<Result<Vec<HeaderValue>>>()?,
793 )
794 })
795 .allow_headers(Any),
796 )
797 } else {
798 info!("HTTP server cross-origin is disabled");
799 None
800 };
801
802 Ok(router
803 .layer(
805 ServiceBuilder::new()
806 .layer(TraceLayer::new_for_http().on_failure(()))
809 .option_layer(cors_layer)
810 .option_layer(timeout_layer)
811 .option_layer(body_limit_layer)
812 .layer(middleware::from_fn_with_state(
814 AuthState::new(self.user_provider.clone()),
815 authorize::check_http_auth,
816 ))
817 .layer(middleware::from_fn(hints::extract_hints))
818 .layer(middleware::from_fn(
819 read_preference::extract_read_preference,
820 )),
821 )
822 .nest(
824 "/debug",
825 Router::new()
826 .route("/log_level", routing::post(dyn_log::dyn_log_handler))
828 .nest(
829 "/prof",
830 Router::new()
831 .route("/cpu", routing::post(pprof::pprof_handler))
832 .route("/mem", routing::post(mem_prof::mem_prof_handler)),
833 ),
834 ))
835 }
836
837 fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
838 Router::new()
839 .route("/metrics", routing::get(handler::metrics))
840 .with_state(metrics_handler)
841 }
842
843 fn route_loki<S>(log_state: LogState) -> Router<S> {
844 Router::new()
845 .route("/api/v1/push", routing::post(loki::loki_ingest))
846 .layer(
847 ServiceBuilder::new()
848 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
849 )
850 .with_state(log_state)
851 }
852
853 fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
854 Router::new()
855 .route(
857 "/",
858 routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
859 )
860 .route("/", routing::get(elasticsearch::handle_get_version))
862 .route("/_license", routing::get(elasticsearch::handle_get_license))
864 .route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
865 .route(
866 "/{index}/_bulk",
867 routing::post(elasticsearch::handle_bulk_api_with_index),
868 )
869 .route(
871 "/_ilm/policy/{*path}",
872 routing::any((
873 HttpStatusCode::OK,
874 elasticsearch::elasticsearch_headers(),
875 axum::Json(serde_json::json!({})),
876 )),
877 )
878 .route(
880 "/_index_template/{*path}",
881 routing::any((
882 HttpStatusCode::OK,
883 elasticsearch::elasticsearch_headers(),
884 axum::Json(serde_json::json!({})),
885 )),
886 )
887 .route(
890 "/_ingest/{*path}",
891 routing::any((
892 HttpStatusCode::OK,
893 elasticsearch::elasticsearch_headers(),
894 axum::Json(serde_json::json!({})),
895 )),
896 )
897 .route(
900 "/_nodes/{*path}",
901 routing::any((
902 HttpStatusCode::OK,
903 elasticsearch::elasticsearch_headers(),
904 axum::Json(serde_json::json!({})),
905 )),
906 )
907 .route(
910 "/logstash/{*path}",
911 routing::any((
912 HttpStatusCode::OK,
913 elasticsearch::elasticsearch_headers(),
914 axum::Json(serde_json::json!({})),
915 )),
916 )
917 .route(
918 "/_logstash/{*path}",
919 routing::any((
920 HttpStatusCode::OK,
921 elasticsearch::elasticsearch_headers(),
922 axum::Json(serde_json::json!({})),
923 )),
924 )
925 .layer(ServiceBuilder::new().layer(RequestDecompressionLayer::new()))
926 .with_state(log_state)
927 }
928
929 #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
930 fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
931 Router::new()
932 .route("/logs", routing::post(event::log_ingester))
933 .route(
934 "/pipelines/{pipeline_name}",
935 routing::get(event::query_pipeline),
936 )
937 .route(
938 "/pipelines/{pipeline_name}",
939 routing::post(event::add_pipeline),
940 )
941 .route(
942 "/pipelines/{pipeline_name}",
943 routing::delete(event::delete_pipeline),
944 )
945 .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
946 .layer(
947 ServiceBuilder::new()
948 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
949 )
950 .with_state(log_state)
951 }
952
953 fn route_pipelines<S>(log_state: LogState) -> Router<S> {
954 Router::new()
955 .route("/ingest", routing::post(event::log_ingester))
956 .route(
957 "/pipelines/{pipeline_name}",
958 routing::get(event::query_pipeline),
959 )
960 .route(
961 "/pipelines/{pipeline_name}",
962 routing::post(event::add_pipeline),
963 )
964 .route(
965 "/pipelines/{pipeline_name}",
966 routing::delete(event::delete_pipeline),
967 )
968 .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
969 .layer(
970 ServiceBuilder::new()
971 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
972 )
973 .with_state(log_state)
974 }
975
976 fn route_sql<S>(api_state: ApiState) -> Router<S> {
977 Router::new()
978 .route("/sql", routing::get(handler::sql).post(handler::sql))
979 .route(
980 "/sql/parse",
981 routing::get(handler::sql_parse).post(handler::sql_parse),
982 )
983 .route(
984 "/promql",
985 routing::get(handler::promql).post(handler::promql),
986 )
987 .with_state(api_state)
988 }
989
990 fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
991 Router::new()
992 .route("/logs", routing::get(logs::logs).post(logs::logs))
993 .with_state(log_handler)
994 }
995
996 fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
1000 Router::new()
1001 .route(
1002 "/format_query",
1003 routing::post(format_query).get(format_query),
1004 )
1005 .route("/status/buildinfo", routing::get(build_info_query))
1006 .route("/query", routing::post(instant_query).get(instant_query))
1007 .route("/query_range", routing::post(range_query).get(range_query))
1008 .route("/labels", routing::post(labels_query).get(labels_query))
1009 .route("/series", routing::post(series_query).get(series_query))
1010 .route("/parse_query", routing::post(parse_query).get(parse_query))
1011 .route(
1012 "/label/{label_name}/values",
1013 routing::get(label_values_query),
1014 )
1015 .layer(ServiceBuilder::new().layer(CompressionLayer::new()))
1016 .with_state(prometheus_handler)
1017 }
1018
1019 fn route_prom<S>(state: PromStoreState) -> Router<S> {
1025 Router::new()
1026 .route("/read", routing::post(prom_store::remote_read))
1027 .route("/write", routing::post(prom_store::remote_write))
1028 .with_state(state)
1029 }
1030
1031 fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
1032 Router::new()
1033 .route("/write", routing::post(influxdb_write_v1))
1034 .route("/api/v2/write", routing::post(influxdb_write_v2))
1035 .layer(
1036 ServiceBuilder::new()
1037 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1038 )
1039 .route("/ping", routing::get(influxdb_ping))
1040 .route("/health", routing::get(influxdb_health))
1041 .with_state(influxdb_handler)
1042 }
1043
1044 fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
1045 Router::new()
1046 .route("/api/put", routing::post(opentsdb::put))
1047 .with_state(opentsdb_handler)
1048 }
1049
1050 fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
1051 Router::new()
1052 .route("/v1/metrics", routing::post(otlp::metrics))
1053 .route("/v1/traces", routing::post(otlp::traces))
1054 .route("/v1/logs", routing::post(otlp::logs))
1055 .layer(
1056 ServiceBuilder::new()
1057 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1058 )
1059 .with_state(otlp_handler)
1060 }
1061
1062 fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
1063 Router::new()
1064 .route("/config", routing::get(handler::config))
1065 .with_state(state)
1066 }
1067
1068 fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
1069 Router::new()
1070 .route("/api/services", routing::get(jaeger::handle_get_services))
1071 .route(
1072 "/api/services/{service_name}/operations",
1073 routing::get(jaeger::handle_get_operations_by_service),
1074 )
1075 .route(
1076 "/api/operations",
1077 routing::get(jaeger::handle_get_operations),
1078 )
1079 .route("/api/traces", routing::get(jaeger::handle_find_traces))
1080 .route(
1081 "/api/traces/{trace_id}",
1082 routing::get(jaeger::handle_get_trace),
1083 )
1084 .with_state(handler)
1085 }
1086}
1087
1088pub const HTTP_SERVER: &str = "HTTP_SERVER";
1089
1090#[async_trait]
1091impl Server for HttpServer {
1092 async fn shutdown(&self) -> Result<()> {
1093 let mut shutdown_tx = self.shutdown_tx.lock().await;
1094 if let Some(tx) = shutdown_tx.take() {
1095 if tx.send(()).is_err() {
1096 info!("Receiver dropped, the HTTP server has already existed");
1097 }
1098 }
1099 info!("Shutdown HTTP server");
1100
1101 Ok(())
1102 }
1103
1104 async fn start(&mut self, listening: SocketAddr) -> Result<()> {
1105 let (tx, rx) = oneshot::channel();
1106 let serve = {
1107 let mut shutdown_tx = self.shutdown_tx.lock().await;
1108 ensure!(
1109 shutdown_tx.is_none(),
1110 AlreadyStartedSnafu { server: "HTTP" }
1111 );
1112
1113 let mut app = self.make_app();
1114 if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
1115 app = configurator.config_http(app);
1116 }
1117 let app = self.build(app)?;
1118 let listener = tokio::net::TcpListener::bind(listening)
1119 .await
1120 .context(AddressBindSnafu { addr: listening })?
1121 .tap_io(|tcp_stream| {
1122 if let Err(e) = tcp_stream.set_nodelay(true) {
1123 error!(e; "Failed to set TCP_NODELAY on incoming connection");
1124 }
1125 });
1126 let serve = axum::serve(listener, app.into_make_service());
1127
1128 *shutdown_tx = Some(tx);
1145
1146 serve
1147 };
1148 let listening = serve.local_addr().context(InternalIoSnafu)?;
1149 info!("HTTP server is bound to {}", listening);
1150
1151 common_runtime::spawn_global(async move {
1152 if let Err(e) = serve
1153 .with_graceful_shutdown(rx.map(drop))
1154 .await
1155 .context(InternalIoSnafu)
1156 {
1157 error!(e; "Failed to shutdown http server");
1158 }
1159 });
1160
1161 self.bind_addr = Some(listening);
1162 Ok(())
1163 }
1164
1165 fn name(&self) -> &str {
1166 HTTP_SERVER
1167 }
1168
1169 fn bind_addr(&self) -> Option<SocketAddr> {
1170 self.bind_addr
1171 }
1172}
1173
1174#[cfg(test)]
1175mod test {
1176 use std::future::pending;
1177 use std::io::Cursor;
1178 use std::sync::Arc;
1179
1180 use arrow_ipc::reader::FileReader;
1181 use arrow_schema::DataType;
1182 use axum::handler::Handler;
1183 use axum::http::StatusCode;
1184 use axum::routing::get;
1185 use common_query::Output;
1186 use common_recordbatch::RecordBatches;
1187 use datafusion_expr::LogicalPlan;
1188 use datatypes::prelude::*;
1189 use datatypes::schema::{ColumnSchema, Schema};
1190 use datatypes::vectors::{StringVector, UInt32Vector};
1191 use header::constants::GREPTIME_DB_HEADER_TIMEOUT;
1192 use query::parser::PromQuery;
1193 use query::query_engine::DescribeResult;
1194 use session::context::QueryContextRef;
1195 use tokio::sync::mpsc;
1196 use tokio::time::Instant;
1197
1198 use super::*;
1199 use crate::error::Error;
1200 use crate::http::test_helpers::TestClient;
1201 use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
1202
1203 struct DummyInstance {
1204 _tx: mpsc::Sender<(String, Vec<u8>)>,
1205 }
1206
1207 #[async_trait]
1208 impl SqlQueryHandler for DummyInstance {
1209 type Error = Error;
1210
1211 async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
1212 unimplemented!()
1213 }
1214
1215 async fn do_promql_query(
1216 &self,
1217 _: &PromQuery,
1218 _: QueryContextRef,
1219 ) -> Vec<std::result::Result<Output, Self::Error>> {
1220 unimplemented!()
1221 }
1222
1223 async fn do_exec_plan(
1224 &self,
1225 _plan: LogicalPlan,
1226 _query_ctx: QueryContextRef,
1227 ) -> std::result::Result<Output, Self::Error> {
1228 unimplemented!()
1229 }
1230
1231 async fn do_describe(
1232 &self,
1233 _stmt: sql::statements::statement::Statement,
1234 _query_ctx: QueryContextRef,
1235 ) -> Result<Option<DescribeResult>> {
1236 unimplemented!()
1237 }
1238
1239 async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
1240 Ok(true)
1241 }
1242 }
1243
1244 fn timeout() -> DynamicTimeoutLayer {
1245 DynamicTimeoutLayer::new(Duration::from_millis(10))
1246 }
1247
1248 async fn forever() {
1249 pending().await
1250 }
1251
1252 fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
1253 make_test_app_custom(tx, HttpOptions::default())
1254 }
1255
1256 fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
1257 let instance = Arc::new(DummyInstance { _tx: tx });
1258 let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
1259 let server = HttpServerBuilder::new(options)
1260 .with_sql_handler(sql_instance)
1261 .build();
1262 server.build(server.make_app()).unwrap().route(
1263 "/test/timeout",
1264 get(forever.layer(ServiceBuilder::new().layer(timeout()))),
1265 )
1266 }
1267
1268 #[tokio::test]
1269 pub async fn test_cors() {
1270 let (tx, _rx) = mpsc::channel(100);
1272 let app = make_test_app(tx);
1273 let client = TestClient::new(app).await;
1274
1275 let res = client.get("/health").send().await;
1276
1277 assert_eq!(res.status(), StatusCode::OK);
1278 assert_eq!(
1279 res.headers()
1280 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1281 .expect("expect cors header origin"),
1282 "*"
1283 );
1284
1285 let res = client
1286 .options("/health")
1287 .header("Access-Control-Request-Headers", "x-greptime-auth")
1288 .header("Access-Control-Request-Method", "DELETE")
1289 .header("Origin", "https://example.com")
1290 .send()
1291 .await;
1292 assert_eq!(res.status(), StatusCode::OK);
1293 assert_eq!(
1294 res.headers()
1295 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1296 .expect("expect cors header origin"),
1297 "*"
1298 );
1299 assert_eq!(
1300 res.headers()
1301 .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
1302 .expect("expect cors header headers"),
1303 "*"
1304 );
1305 assert_eq!(
1306 res.headers()
1307 .get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
1308 .expect("expect cors header methods"),
1309 "GET,POST,PUT,DELETE,HEAD"
1310 );
1311 }
1312
1313 #[tokio::test]
1314 pub async fn test_cors_custom_origins() {
1315 let (tx, _rx) = mpsc::channel(100);
1317 let origin = "https://example.com";
1318
1319 let options = HttpOptions {
1320 cors_allowed_origins: vec![origin.to_string()],
1321 ..Default::default()
1322 };
1323
1324 let app = make_test_app_custom(tx, options);
1325 let client = TestClient::new(app).await;
1326
1327 let res = client.get("/health").header("Origin", origin).send().await;
1328
1329 assert_eq!(res.status(), StatusCode::OK);
1330 assert_eq!(
1331 res.headers()
1332 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1333 .expect("expect cors header origin"),
1334 origin
1335 );
1336
1337 let res = client
1338 .get("/health")
1339 .header("Origin", "https://notallowed.com")
1340 .send()
1341 .await;
1342
1343 assert_eq!(res.status(), StatusCode::OK);
1344 assert!(!res
1345 .headers()
1346 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1347 }
1348
1349 #[tokio::test]
1350 pub async fn test_cors_disabled() {
1351 let (tx, _rx) = mpsc::channel(100);
1353
1354 let options = HttpOptions {
1355 enable_cors: false,
1356 ..Default::default()
1357 };
1358
1359 let app = make_test_app_custom(tx, options);
1360 let client = TestClient::new(app).await;
1361
1362 let res = client.get("/health").send().await;
1363
1364 assert_eq!(res.status(), StatusCode::OK);
1365 assert!(!res
1366 .headers()
1367 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
1368 }
1369
1370 #[test]
1371 fn test_http_options_default() {
1372 let default = HttpOptions::default();
1373 assert_eq!("127.0.0.1:4000".to_string(), default.addr);
1374 assert_eq!(Duration::from_secs(0), default.timeout)
1375 }
1376
1377 #[tokio::test]
1378 async fn test_http_server_request_timeout() {
1379 common_telemetry::init_default_ut_logging();
1380
1381 let (tx, _rx) = mpsc::channel(100);
1382 let app = make_test_app(tx);
1383 let client = TestClient::new(app).await;
1384 let res = client.get("/test/timeout").send().await;
1385 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1386
1387 let now = Instant::now();
1388 let res = client
1389 .get("/test/timeout")
1390 .header(GREPTIME_DB_HEADER_TIMEOUT, "20ms")
1391 .send()
1392 .await;
1393 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1394 let elapsed = now.elapsed();
1395 assert!(elapsed > Duration::from_millis(15));
1396
1397 tokio::time::timeout(
1398 Duration::from_millis(15),
1399 client
1400 .get("/test/timeout")
1401 .header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
1402 .send(),
1403 )
1404 .await
1405 .unwrap_err();
1406
1407 tokio::time::timeout(
1408 Duration::from_millis(15),
1409 client
1410 .get("/test/timeout")
1411 .header(
1412 GREPTIME_DB_HEADER_TIMEOUT,
1413 humantime::format_duration(Duration::default()).to_string(),
1414 )
1415 .send(),
1416 )
1417 .await
1418 .unwrap_err();
1419 }
1420
1421 #[tokio::test]
1422 async fn test_schema_for_empty_response() {
1423 let column_schemas = vec![
1424 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1425 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1426 ];
1427 let schema = Arc::new(Schema::new(column_schemas));
1428
1429 let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
1430 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1431
1432 let json_resp = GreptimedbV1Response::from_output(outputs).await;
1433 if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
1434 let json_output = &json_resp.output[0];
1435 if let GreptimeQueryOutput::Records(r) = json_output {
1436 assert_eq!(r.num_rows(), 0);
1437 assert_eq!(r.num_cols(), 2);
1438 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1439 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1440 } else {
1441 panic!("invalid output type");
1442 }
1443 } else {
1444 panic!("invalid format")
1445 }
1446 }
1447
1448 #[tokio::test]
1449 async fn test_recordbatches_conversion() {
1450 let column_schemas = vec![
1451 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1452 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1453 ];
1454 let schema = Arc::new(Schema::new(column_schemas));
1455 let columns: Vec<VectorRef> = vec![
1456 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
1457 Arc::new(StringVector::from(vec![
1458 None,
1459 Some("hello"),
1460 Some("greptime"),
1461 None,
1462 ])),
1463 ];
1464 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
1465
1466 for format in [
1467 ResponseFormat::GreptimedbV1,
1468 ResponseFormat::InfluxdbV1,
1469 ResponseFormat::Csv,
1470 ResponseFormat::Table,
1471 ResponseFormat::Arrow,
1472 ResponseFormat::Json,
1473 ] {
1474 let recordbatches =
1475 RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
1476 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1477 let json_resp = match format {
1478 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
1479 ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
1480 ResponseFormat::Table => TableResponse::from_output(outputs).await,
1481 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
1482 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
1483 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
1484 };
1485
1486 match json_resp {
1487 HttpResponse::GreptimedbV1(resp) => {
1488 let json_output = &resp.output[0];
1489 if let GreptimeQueryOutput::Records(r) = json_output {
1490 assert_eq!(r.num_rows(), 4);
1491 assert_eq!(r.num_cols(), 2);
1492 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1493 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1494 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1495 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1496 } else {
1497 panic!("invalid output type");
1498 }
1499 }
1500 HttpResponse::InfluxdbV1(resp) => {
1501 let json_output = &resp.results()[0];
1502 assert_eq!(json_output.num_rows(), 4);
1503 assert_eq!(json_output.num_cols(), 2);
1504 assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
1505 assert_eq!(
1506 json_output.series[0].values[0][0],
1507 serde_json::Value::from(1)
1508 );
1509 assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
1510 }
1511 HttpResponse::Csv(resp) => {
1512 let output = &resp.output()[0];
1513 if let GreptimeQueryOutput::Records(r) = output {
1514 assert_eq!(r.num_rows(), 4);
1515 assert_eq!(r.num_cols(), 2);
1516 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1517 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1518 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1519 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1520 } else {
1521 panic!("invalid output type");
1522 }
1523 }
1524
1525 HttpResponse::Table(resp) => {
1526 let output = &resp.output()[0];
1527 if let GreptimeQueryOutput::Records(r) = output {
1528 assert_eq!(r.num_rows(), 4);
1529 assert_eq!(r.num_cols(), 2);
1530 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1531 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1532 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1533 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1534 } else {
1535 panic!("invalid output type");
1536 }
1537 }
1538
1539 HttpResponse::Arrow(resp) => {
1540 let output = resp.data;
1541 let mut reader =
1542 FileReader::try_new(Cursor::new(output), None).expect("Arrow reader error");
1543 let schema = reader.schema();
1544 assert_eq!(schema.fields[0].name(), "numbers");
1545 assert_eq!(schema.fields[0].data_type(), &DataType::UInt32);
1546 assert_eq!(schema.fields[1].name(), "strings");
1547 assert_eq!(schema.fields[1].data_type(), &DataType::Utf8);
1548
1549 let rb = reader.next().unwrap().expect("read record batch failed");
1550 assert_eq!(rb.num_columns(), 2);
1551 assert_eq!(rb.num_rows(), 4);
1552 }
1553
1554 HttpResponse::Json(resp) => {
1555 let output = &resp.output()[0];
1556 if let GreptimeQueryOutput::Records(r) = output {
1557 assert_eq!(r.num_rows(), 4);
1558 assert_eq!(r.num_cols(), 2);
1559 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1560 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1561 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1562 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1563 } else {
1564 panic!("invalid output type");
1565 }
1566 }
1567
1568 HttpResponse::Error(err) => unreachable!("{err:?}"),
1569 }
1570 }
1571 }
1572}