1use std::collections::HashMap;
16use std::convert::Infallible;
17use std::fmt::Display;
18use std::net::SocketAddr;
19use std::sync::Mutex as StdMutex;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use auth::UserProviderRef;
24use axum::extract::{DefaultBodyLimit, Request};
25use axum::http::StatusCode as HttpStatusCode;
26use axum::response::{IntoResponse, Response};
27use axum::routing::Route;
28use axum::serve::ListenerExt;
29use axum::{Router, middleware, routing};
30use common_base::Plugins;
31use common_base::readable_size::ReadableSize;
32use common_recordbatch::RecordBatch;
33use common_telemetry::{debug, error, info};
34use common_time::Timestamp;
35use common_time::timestamp::TimeUnit;
36use datatypes::data_type::DataType;
37use datatypes::schema::SchemaRef;
38use event::{LogState, LogValidatorRef};
39use futures::FutureExt;
40use http::{HeaderValue, Method};
41use prost::DecodeError;
42use serde::{Deserialize, Serialize};
43use serde_json::Value;
44use snafu::{ResultExt, ensure};
45use tokio::sync::Mutex;
46use tokio::sync::oneshot::{self, Sender};
47use tonic::codegen::Service;
48use tower::{Layer, ServiceBuilder};
49use tower_http::compression::CompressionLayer;
50use tower_http::cors::{AllowOrigin, Any, CorsLayer};
51use tower_http::decompression::RequestDecompressionLayer;
52use tower_http::trace::TraceLayer;
53
54use self::authorize::AuthState;
55use self::result::table_result::TableResponse;
56use crate::configurator::HttpConfiguratorRef;
57use crate::elasticsearch;
58use crate::error::{
59 AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu,
60 OtherSnafu, Result,
61};
62use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
63use crate::http::otlp::OtlpState;
64use crate::http::prom_store::PromStoreState;
65use crate::http::prometheus::{
66 build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
67 range_query, series_query,
68};
69use crate::http::result::arrow_result::ArrowResponse;
70use crate::http::result::csv_result::CsvResponse;
71use crate::http::result::error_result::ErrorResponse;
72use crate::http::result::greptime_result_v1::GreptimedbV1Response;
73use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
74use crate::http::result::json_result::JsonResponse;
75use crate::http::result::null_result::NullResponse;
76use crate::interceptor::LogIngestInterceptorRef;
77use crate::metrics::http_metrics_layer;
78use crate::metrics_handler::MetricsHandler;
79use crate::prometheus_handler::PrometheusHandlerRef;
80use crate::query_handler::sql::ServerSqlQueryHandlerRef;
81use crate::query_handler::{
82 InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
83 OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
84 PromStoreProtocolHandlerRef,
85};
86use crate::request_memory_limiter::ServerMemoryLimiter;
87use crate::server::Server;
88
89pub mod authorize;
90#[cfg(feature = "dashboard")]
91mod dashboard;
92pub mod dyn_log;
93pub mod dyn_trace;
94pub mod event;
95pub mod extractor;
96pub mod handler;
97pub mod header;
98pub mod influxdb;
99pub mod jaeger;
100pub mod logs;
101pub mod loki;
102pub mod mem_prof;
103mod memory_limit;
104pub mod opentsdb;
105pub mod otlp;
106pub mod pprof;
107pub mod prom_store;
108pub mod prometheus;
109pub mod result;
110mod timeout;
111pub mod utils;
112
113use result::HttpOutputWriter;
114pub(crate) use timeout::DynamicTimeoutLayer;
115
116mod hints;
117mod read_preference;
118#[cfg(any(test, feature = "testing"))]
119pub mod test_helpers;
120
121pub const HTTP_API_VERSION: &str = "v1";
122pub const HTTP_API_PREFIX: &str = "/v1/";
123const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
125
126pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
128
129pub static PUBLIC_APIS: [&str; 3] = ["/v1/influxdb/ping", "/v1/influxdb/health", "/v1/health"];
131
132#[derive(Default)]
133pub struct HttpServer {
134 router: StdMutex<Router>,
135 shutdown_tx: Mutex<Option<Sender<()>>>,
136 user_provider: Option<UserProviderRef>,
137 memory_limiter: ServerMemoryLimiter,
138
139 plugins: Plugins,
141
142 options: HttpOptions,
144 bind_addr: Option<SocketAddr>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
148#[serde(default)]
149pub struct HttpOptions {
150 pub addr: String,
151
152 #[serde(with = "humantime_serde")]
153 pub timeout: Duration,
154
155 #[serde(skip)]
156 pub disable_dashboard: bool,
157
158 pub body_limit: ReadableSize,
159
160 pub prom_validation_mode: PromValidationMode,
162
163 pub cors_allowed_origins: Vec<String>,
164
165 pub enable_cors: bool,
166}
167
168#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub enum PromValidationMode {
171 Strict,
173 Lossy,
175 Unchecked,
177}
178
179impl PromValidationMode {
180 pub fn decode_string(&self, bytes: &[u8]) -> std::result::Result<String, DecodeError> {
182 let result = match self {
183 PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) {
184 Ok(s) => s,
185 Err(e) => {
186 debug!("Invalid UTF-8 string value: {:?}, error: {:?}", bytes, e);
187 return Err(DecodeError::new("invalid utf-8"));
188 }
189 },
190 PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(),
191 PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) },
192 };
193 Ok(result)
194 }
195}
196
197impl Default for HttpOptions {
198 fn default() -> Self {
199 Self {
200 addr: "127.0.0.1:4000".to_string(),
201 timeout: Duration::from_secs(0),
202 disable_dashboard: false,
203 body_limit: DEFAULT_BODY_LIMIT,
204 cors_allowed_origins: Vec::new(),
205 enable_cors: true,
206 prom_validation_mode: PromValidationMode::Strict,
207 }
208 }
209}
210
211#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
212pub struct ColumnSchema {
213 name: String,
214 data_type: String,
215}
216
217impl ColumnSchema {
218 pub fn new(name: String, data_type: String) -> ColumnSchema {
219 ColumnSchema { name, data_type }
220 }
221}
222
223#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
224pub struct OutputSchema {
225 column_schemas: Vec<ColumnSchema>,
226}
227
228impl OutputSchema {
229 pub fn new(columns: Vec<ColumnSchema>) -> OutputSchema {
230 OutputSchema {
231 column_schemas: columns,
232 }
233 }
234}
235
236impl From<SchemaRef> for OutputSchema {
237 fn from(schema: SchemaRef) -> OutputSchema {
238 OutputSchema {
239 column_schemas: schema
240 .column_schemas()
241 .iter()
242 .map(|cs| ColumnSchema {
243 name: cs.name.clone(),
244 data_type: cs.data_type.name(),
245 })
246 .collect(),
247 }
248 }
249}
250
251#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
252pub struct HttpRecordsOutput {
253 schema: OutputSchema,
254 rows: Vec<Vec<Value>>,
255 #[serde(default)]
258 total_rows: usize,
259
260 #[serde(skip_serializing_if = "HashMap::is_empty")]
262 #[serde(default)]
263 metrics: HashMap<String, Value>,
264}
265
266impl HttpRecordsOutput {
267 pub fn num_rows(&self) -> usize {
268 self.rows.len()
269 }
270
271 pub fn num_cols(&self) -> usize {
272 self.schema.column_schemas.len()
273 }
274
275 pub fn schema(&self) -> &OutputSchema {
276 &self.schema
277 }
278
279 pub fn rows(&self) -> &Vec<Vec<Value>> {
280 &self.rows
281 }
282}
283
284impl HttpRecordsOutput {
285 pub fn try_new(
286 schema: SchemaRef,
287 recordbatches: Vec<RecordBatch>,
288 ) -> std::result::Result<HttpRecordsOutput, Error> {
289 if recordbatches.is_empty() {
290 Ok(HttpRecordsOutput {
291 schema: OutputSchema::from(schema),
292 rows: vec![],
293 total_rows: 0,
294 metrics: Default::default(),
295 })
296 } else {
297 let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
298 let mut rows = Vec::with_capacity(num_rows);
299
300 for recordbatch in recordbatches {
301 let mut writer = HttpOutputWriter::new(schema.num_columns(), None);
302 writer.write(recordbatch, &mut rows)?;
303 }
304
305 Ok(HttpRecordsOutput {
306 schema: OutputSchema::from(schema),
307 total_rows: rows.len(),
308 rows,
309 metrics: Default::default(),
310 })
311 }
312 }
313}
314
315#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
316#[serde(rename_all = "lowercase")]
317pub enum GreptimeQueryOutput {
318 AffectedRows(usize),
319 Records(HttpRecordsOutput),
320}
321
322#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
324pub enum ResponseFormat {
325 Arrow,
326 Csv(bool, bool),
328 Table,
329 #[default]
330 GreptimedbV1,
331 InfluxdbV1,
332 Json,
333 Null,
334}
335
336impl ResponseFormat {
337 pub fn parse(s: &str) -> Option<Self> {
338 match s {
339 "arrow" => Some(ResponseFormat::Arrow),
340 "csv" => Some(ResponseFormat::Csv(false, false)),
341 "csvwithnames" => Some(ResponseFormat::Csv(true, false)),
342 "csvwithnamesandtypes" => Some(ResponseFormat::Csv(true, true)),
343 "table" => Some(ResponseFormat::Table),
344 "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
345 "influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
346 "json" => Some(ResponseFormat::Json),
347 "null" => Some(ResponseFormat::Null),
348 _ => None,
349 }
350 }
351
352 pub fn as_str(&self) -> &'static str {
353 match self {
354 ResponseFormat::Arrow => "arrow",
355 ResponseFormat::Csv(_, _) => "csv",
356 ResponseFormat::Table => "table",
357 ResponseFormat::GreptimedbV1 => "greptimedb_v1",
358 ResponseFormat::InfluxdbV1 => "influxdb_v1",
359 ResponseFormat::Json => "json",
360 ResponseFormat::Null => "null",
361 }
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366pub enum Epoch {
367 Nanosecond,
368 Microsecond,
369 Millisecond,
370 Second,
371}
372
373impl Epoch {
374 pub fn parse(s: &str) -> Option<Epoch> {
375 match s {
380 "ns" => Some(Epoch::Nanosecond),
381 "u" | "µ" => Some(Epoch::Microsecond),
382 "ms" => Some(Epoch::Millisecond),
383 "s" => Some(Epoch::Second),
384 _ => None, }
386 }
387
388 pub fn convert_timestamp(&self, ts: Timestamp) -> Option<Timestamp> {
389 match self {
390 Epoch::Nanosecond => ts.convert_to(TimeUnit::Nanosecond),
391 Epoch::Microsecond => ts.convert_to(TimeUnit::Microsecond),
392 Epoch::Millisecond => ts.convert_to(TimeUnit::Millisecond),
393 Epoch::Second => ts.convert_to(TimeUnit::Second),
394 }
395 }
396}
397
398impl Display for Epoch {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 match self {
401 Epoch::Nanosecond => write!(f, "Epoch::Nanosecond"),
402 Epoch::Microsecond => write!(f, "Epoch::Microsecond"),
403 Epoch::Millisecond => write!(f, "Epoch::Millisecond"),
404 Epoch::Second => write!(f, "Epoch::Second"),
405 }
406 }
407}
408
409#[derive(Serialize, Deserialize, Debug)]
410pub enum HttpResponse {
411 Arrow(ArrowResponse),
412 Csv(CsvResponse),
413 Table(TableResponse),
414 Error(ErrorResponse),
415 GreptimedbV1(GreptimedbV1Response),
416 InfluxdbV1(InfluxdbV1Response),
417 Json(JsonResponse),
418 Null(NullResponse),
419}
420
421impl HttpResponse {
422 pub fn with_execution_time(self, execution_time: u64) -> Self {
423 match self {
424 HttpResponse::Arrow(resp) => resp.with_execution_time(execution_time).into(),
425 HttpResponse::Csv(resp) => resp.with_execution_time(execution_time).into(),
426 HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
427 HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
428 HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
429 HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
430 HttpResponse::Null(resp) => resp.with_execution_time(execution_time).into(),
431 HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
432 }
433 }
434
435 pub fn with_limit(self, limit: usize) -> Self {
436 match self {
437 HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
438 HttpResponse::Table(resp) => resp.with_limit(limit).into(),
439 HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
440 HttpResponse::Json(resp) => resp.with_limit(limit).into(),
441 _ => self,
442 }
443 }
444}
445
446pub fn process_with_limit(
447 mut outputs: Vec<GreptimeQueryOutput>,
448 limit: usize,
449) -> Vec<GreptimeQueryOutput> {
450 outputs
451 .drain(..)
452 .map(|data| match data {
453 GreptimeQueryOutput::Records(mut records) => {
454 if records.rows.len() > limit {
455 records.rows.truncate(limit);
456 records.total_rows = limit;
457 }
458 GreptimeQueryOutput::Records(records)
459 }
460 _ => data,
461 })
462 .collect()
463}
464
465impl IntoResponse for HttpResponse {
466 fn into_response(self) -> Response {
467 match self {
468 HttpResponse::Arrow(resp) => resp.into_response(),
469 HttpResponse::Csv(resp) => resp.into_response(),
470 HttpResponse::Table(resp) => resp.into_response(),
471 HttpResponse::GreptimedbV1(resp) => resp.into_response(),
472 HttpResponse::InfluxdbV1(resp) => resp.into_response(),
473 HttpResponse::Json(resp) => resp.into_response(),
474 HttpResponse::Null(resp) => resp.into_response(),
475 HttpResponse::Error(resp) => resp.into_response(),
476 }
477 }
478}
479
480impl From<ArrowResponse> for HttpResponse {
481 fn from(value: ArrowResponse) -> Self {
482 HttpResponse::Arrow(value)
483 }
484}
485
486impl From<CsvResponse> for HttpResponse {
487 fn from(value: CsvResponse) -> Self {
488 HttpResponse::Csv(value)
489 }
490}
491
492impl From<TableResponse> for HttpResponse {
493 fn from(value: TableResponse) -> Self {
494 HttpResponse::Table(value)
495 }
496}
497
498impl From<ErrorResponse> for HttpResponse {
499 fn from(value: ErrorResponse) -> Self {
500 HttpResponse::Error(value)
501 }
502}
503
504impl From<GreptimedbV1Response> for HttpResponse {
505 fn from(value: GreptimedbV1Response) -> Self {
506 HttpResponse::GreptimedbV1(value)
507 }
508}
509
510impl From<InfluxdbV1Response> for HttpResponse {
511 fn from(value: InfluxdbV1Response) -> Self {
512 HttpResponse::InfluxdbV1(value)
513 }
514}
515
516impl From<JsonResponse> for HttpResponse {
517 fn from(value: JsonResponse) -> Self {
518 HttpResponse::Json(value)
519 }
520}
521
522impl From<NullResponse> for HttpResponse {
523 fn from(value: NullResponse) -> Self {
524 HttpResponse::Null(value)
525 }
526}
527
528#[derive(Clone)]
529pub struct ApiState {
530 pub sql_handler: ServerSqlQueryHandlerRef,
531}
532
533#[derive(Clone)]
534pub struct GreptimeOptionsConfigState {
535 pub greptime_config_options: String,
536}
537
538pub struct HttpServerBuilder {
539 options: HttpOptions,
540 plugins: Plugins,
541 user_provider: Option<UserProviderRef>,
542 router: Router,
543 memory_limiter: ServerMemoryLimiter,
544}
545
546impl HttpServerBuilder {
547 pub fn new(options: HttpOptions) -> Self {
548 Self {
549 options,
550 plugins: Plugins::default(),
551 user_provider: None,
552 router: Router::new(),
553 memory_limiter: ServerMemoryLimiter::default(),
554 }
555 }
556
557 pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
559 self.memory_limiter = limiter;
560 self
561 }
562
563 pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
564 let sql_router = HttpServer::route_sql(ApiState { sql_handler });
565
566 Self {
567 router: self
568 .router
569 .nest(&format!("/{HTTP_API_VERSION}"), sql_router),
570 ..self
571 }
572 }
573
574 pub fn with_logs_handler(self, logs_handler: LogQueryHandlerRef) -> Self {
575 let logs_router = HttpServer::route_logs(logs_handler);
576
577 Self {
578 router: self
579 .router
580 .nest(&format!("/{HTTP_API_VERSION}"), logs_router),
581 ..self
582 }
583 }
584
585 pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self {
586 Self {
587 router: self.router.nest(
588 &format!("/{HTTP_API_VERSION}/opentsdb"),
589 HttpServer::route_opentsdb(handler),
590 ),
591 ..self
592 }
593 }
594
595 pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self {
596 Self {
597 router: self.router.nest(
598 &format!("/{HTTP_API_VERSION}/influxdb"),
599 HttpServer::route_influxdb(handler),
600 ),
601 ..self
602 }
603 }
604
605 pub fn with_prom_handler(
606 self,
607 handler: PromStoreProtocolHandlerRef,
608 pipeline_handler: Option<PipelineHandlerRef>,
609 prom_store_with_metric_engine: bool,
610 prom_validation_mode: PromValidationMode,
611 ) -> Self {
612 let state = PromStoreState {
613 prom_store_handler: handler,
614 pipeline_handler,
615 prom_store_with_metric_engine,
616 prom_validation_mode,
617 };
618
619 Self {
620 router: self.router.nest(
621 &format!("/{HTTP_API_VERSION}/prometheus"),
622 HttpServer::route_prom(state),
623 ),
624 ..self
625 }
626 }
627
628 pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self {
629 Self {
630 router: self.router.nest(
631 &format!("/{HTTP_API_VERSION}/prometheus/api/v1"),
632 HttpServer::route_prometheus(handler),
633 ),
634 ..self
635 }
636 }
637
638 pub fn with_otlp_handler(
639 self,
640 handler: OpenTelemetryProtocolHandlerRef,
641 with_metric_engine: bool,
642 ) -> Self {
643 Self {
644 router: self.router.nest(
645 &format!("/{HTTP_API_VERSION}/otlp"),
646 HttpServer::route_otlp(handler, with_metric_engine),
647 ),
648 ..self
649 }
650 }
651
652 pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self {
653 Self {
654 user_provider: Some(user_provider),
655 ..self
656 }
657 }
658
659 pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self {
660 Self {
661 router: self.router.merge(HttpServer::route_metrics(handler)),
662 ..self
663 }
664 }
665
666 pub fn with_log_ingest_handler(
667 self,
668 handler: PipelineHandlerRef,
669 validator: Option<LogValidatorRef>,
670 ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
671 ) -> Self {
672 let log_state = LogState {
673 log_handler: handler,
674 log_validator: validator,
675 ingest_interceptor,
676 };
677
678 let router = self.router.nest(
679 &format!("/{HTTP_API_VERSION}"),
680 HttpServer::route_pipelines(log_state.clone()),
681 );
682 let router = router.nest(
684 &format!("/{HTTP_API_VERSION}/events"),
685 #[allow(deprecated)]
686 HttpServer::route_log_deprecated(log_state.clone()),
687 );
688
689 let router = router.nest(
690 &format!("/{HTTP_API_VERSION}/loki"),
691 HttpServer::route_loki(log_state.clone()),
692 );
693
694 let router = router.nest(
695 &format!("/{HTTP_API_VERSION}/elasticsearch"),
696 HttpServer::route_elasticsearch(log_state.clone()),
697 );
698
699 let router = router.nest(
700 &format!("/{HTTP_API_VERSION}/elasticsearch/"),
701 Router::new()
702 .route("/", routing::get(elasticsearch::handle_get_version))
703 .with_state(log_state),
704 );
705
706 Self { router, ..self }
707 }
708
709 pub fn with_plugins(self, plugins: Plugins) -> Self {
710 Self { plugins, ..self }
711 }
712
713 pub fn with_greptime_config_options(self, opts: String) -> Self {
714 let config_router = HttpServer::route_config(GreptimeOptionsConfigState {
715 greptime_config_options: opts,
716 });
717
718 Self {
719 router: self.router.merge(config_router),
720 ..self
721 }
722 }
723
724 pub fn with_jaeger_handler(self, handler: JaegerQueryHandlerRef) -> Self {
725 Self {
726 router: self.router.nest(
727 &format!("/{HTTP_API_VERSION}/jaeger"),
728 HttpServer::route_jaeger(handler),
729 ),
730 ..self
731 }
732 }
733
734 pub fn with_extra_router(self, router: Router) -> Self {
735 Self {
736 router: self.router.merge(router),
737 ..self
738 }
739 }
740
741 pub fn add_layer<L>(self, layer: L) -> Self
742 where
743 L: Layer<Route> + Clone + Send + Sync + 'static,
744 L::Service: Service<Request> + Clone + Send + Sync + 'static,
745 <L::Service as Service<Request>>::Response: IntoResponse + 'static,
746 <L::Service as Service<Request>>::Error: Into<Infallible> + 'static,
747 <L::Service as Service<Request>>::Future: Send + 'static,
748 {
749 Self {
750 router: self.router.layer(layer),
751 ..self
752 }
753 }
754
755 pub fn build(self) -> HttpServer {
756 HttpServer {
757 options: self.options,
758 user_provider: self.user_provider,
759 shutdown_tx: Mutex::new(None),
760 plugins: self.plugins,
761 router: StdMutex::new(self.router),
762 bind_addr: None,
763 memory_limiter: self.memory_limiter,
764 }
765 }
766}
767
768impl HttpServer {
769 pub fn make_app(&self) -> Router {
771 let mut router = {
772 let router = self.router.lock().unwrap();
773 router.clone()
774 };
775
776 router = router
777 .route(
778 "/health",
779 routing::get(handler::health).post(handler::health),
780 )
781 .route(
782 &format!("/{HTTP_API_VERSION}/health"),
783 routing::get(handler::health).post(handler::health),
784 )
785 .route(
786 "/ready",
787 routing::get(handler::health).post(handler::health),
788 );
789
790 router = router.route("/status", routing::get(handler::status));
791
792 #[cfg(feature = "dashboard")]
793 {
794 if !self.options.disable_dashboard {
795 info!("Enable dashboard service at '/dashboard'");
796 router = router.route(
798 "/dashboard",
799 routing::get(|uri: axum::http::uri::Uri| async move {
800 let path = uri.path();
801 let query = uri.query().map(|q| format!("?{}", q)).unwrap_or_default();
802
803 let new_uri = format!("{}/{}", path, query);
804 axum::response::Redirect::permanent(&new_uri)
805 }),
806 );
807
808 router = router
812 .route(
813 "/dashboard/",
814 routing::get(dashboard::static_handler).post(dashboard::static_handler),
815 )
816 .route(
817 "/dashboard/{*x}",
818 routing::get(dashboard::static_handler).post(dashboard::static_handler),
819 );
820 }
821 }
822
823 router = router.route_layer(middleware::from_fn(http_metrics_layer));
825
826 router
827 }
828
829 pub fn build(&self, router: Router) -> Result<Router> {
832 let timeout_layer = if self.options.timeout != Duration::default() {
833 Some(ServiceBuilder::new().layer(DynamicTimeoutLayer::new(self.options.timeout)))
834 } else {
835 info!("HTTP server timeout is disabled");
836 None
837 };
838 let body_limit_layer = if self.options.body_limit != ReadableSize(0) {
839 Some(
840 ServiceBuilder::new()
841 .layer(DefaultBodyLimit::max(self.options.body_limit.0 as usize)),
842 )
843 } else {
844 info!("HTTP server body limit is disabled");
845 None
846 };
847 let cors_layer = if self.options.enable_cors {
848 Some(
849 CorsLayer::new()
850 .allow_methods([
851 Method::GET,
852 Method::POST,
853 Method::PUT,
854 Method::DELETE,
855 Method::HEAD,
856 ])
857 .allow_origin(if self.options.cors_allowed_origins.is_empty() {
858 AllowOrigin::from(Any)
859 } else {
860 AllowOrigin::from(
861 self.options
862 .cors_allowed_origins
863 .iter()
864 .map(|s| {
865 HeaderValue::from_str(s.as_str())
866 .context(InvalidHeaderValueSnafu)
867 })
868 .collect::<Result<Vec<HeaderValue>>>()?,
869 )
870 })
871 .allow_headers(Any),
872 )
873 } else {
874 info!("HTTP server cross-origin is disabled");
875 None
876 };
877
878 Ok(router
879 .layer(
881 ServiceBuilder::new()
882 .layer(TraceLayer::new_for_http().on_failure(()))
885 .option_layer(cors_layer)
886 .option_layer(timeout_layer)
887 .option_layer(body_limit_layer)
888 .layer(middleware::from_fn_with_state(
890 self.memory_limiter.clone(),
891 memory_limit::memory_limit_middleware,
892 ))
893 .layer(middleware::from_fn_with_state(
895 AuthState::new(self.user_provider.clone()),
896 authorize::check_http_auth,
897 ))
898 .layer(middleware::from_fn(hints::extract_hints))
899 .layer(middleware::from_fn(
900 read_preference::extract_read_preference,
901 )),
902 )
903 .nest(
905 "/debug",
906 Router::new()
907 .route("/log_level", routing::post(dyn_log::dyn_log_handler))
909 .route("/enable_trace", routing::post(dyn_trace::dyn_trace_handler))
910 .nest(
911 "/prof",
912 Router::new()
913 .route("/cpu", routing::post(pprof::pprof_handler))
914 .route("/mem", routing::post(mem_prof::mem_prof_handler))
915 .route("/mem/symbol", routing::post(mem_prof::symbolicate_handler))
916 .route(
917 "/mem/activate",
918 routing::post(mem_prof::activate_heap_prof_handler),
919 )
920 .route(
921 "/mem/deactivate",
922 routing::post(mem_prof::deactivate_heap_prof_handler),
923 )
924 .route(
925 "/mem/status",
926 routing::get(mem_prof::heap_prof_status_handler),
927 ) .route(
929 "/mem/gdump",
930 routing::get(mem_prof::gdump_status_handler)
931 .post(mem_prof::gdump_toggle_handler),
932 ),
933 ),
934 ))
935 }
936
937 fn route_metrics<S>(metrics_handler: MetricsHandler) -> Router<S> {
938 Router::new()
939 .route("/metrics", routing::get(handler::metrics))
940 .with_state(metrics_handler)
941 }
942
943 fn route_loki<S>(log_state: LogState) -> Router<S> {
944 Router::new()
945 .route("/api/v1/push", routing::post(loki::loki_ingest))
946 .layer(
947 ServiceBuilder::new()
948 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
949 )
950 .with_state(log_state)
951 }
952
953 fn route_elasticsearch<S>(log_state: LogState) -> Router<S> {
954 Router::new()
955 .route(
957 "/",
958 routing::head((HttpStatusCode::OK, elasticsearch::elasticsearch_headers())),
959 )
960 .route("/", routing::get(elasticsearch::handle_get_version))
962 .route("/_license", routing::get(elasticsearch::handle_get_license))
964 .route("/_bulk", routing::post(elasticsearch::handle_bulk_api))
965 .route(
966 "/{index}/_bulk",
967 routing::post(elasticsearch::handle_bulk_api_with_index),
968 )
969 .route(
971 "/_ilm/policy/{*path}",
972 routing::any((
973 HttpStatusCode::OK,
974 elasticsearch::elasticsearch_headers(),
975 axum::Json(serde_json::json!({})),
976 )),
977 )
978 .route(
980 "/_index_template/{*path}",
981 routing::any((
982 HttpStatusCode::OK,
983 elasticsearch::elasticsearch_headers(),
984 axum::Json(serde_json::json!({})),
985 )),
986 )
987 .route(
990 "/_ingest/{*path}",
991 routing::any((
992 HttpStatusCode::OK,
993 elasticsearch::elasticsearch_headers(),
994 axum::Json(serde_json::json!({})),
995 )),
996 )
997 .route(
1000 "/_nodes/{*path}",
1001 routing::any((
1002 HttpStatusCode::OK,
1003 elasticsearch::elasticsearch_headers(),
1004 axum::Json(serde_json::json!({})),
1005 )),
1006 )
1007 .route(
1010 "/logstash/{*path}",
1011 routing::any((
1012 HttpStatusCode::OK,
1013 elasticsearch::elasticsearch_headers(),
1014 axum::Json(serde_json::json!({})),
1015 )),
1016 )
1017 .route(
1018 "/_logstash/{*path}",
1019 routing::any((
1020 HttpStatusCode::OK,
1021 elasticsearch::elasticsearch_headers(),
1022 axum::Json(serde_json::json!({})),
1023 )),
1024 )
1025 .layer(ServiceBuilder::new().layer(RequestDecompressionLayer::new()))
1026 .with_state(log_state)
1027 }
1028
1029 #[deprecated(since = "0.11.0", note = "Use `route_pipelines()` instead.")]
1030 fn route_log_deprecated<S>(log_state: LogState) -> Router<S> {
1031 Router::new()
1032 .route("/logs", routing::post(event::log_ingester))
1033 .route(
1034 "/pipelines/{pipeline_name}",
1035 routing::get(event::query_pipeline),
1036 )
1037 .route(
1038 "/pipelines/{pipeline_name}",
1039 routing::post(event::add_pipeline),
1040 )
1041 .route(
1042 "/pipelines/{pipeline_name}",
1043 routing::delete(event::delete_pipeline),
1044 )
1045 .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
1046 .layer(
1047 ServiceBuilder::new()
1048 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1049 )
1050 .with_state(log_state)
1051 }
1052
1053 fn route_pipelines<S>(log_state: LogState) -> Router<S> {
1054 Router::new()
1055 .route("/ingest", routing::post(event::log_ingester))
1056 .route(
1057 "/pipelines/{pipeline_name}",
1058 routing::get(event::query_pipeline),
1059 )
1060 .route(
1061 "/pipelines/{pipeline_name}/ddl",
1062 routing::get(event::query_pipeline_ddl),
1063 )
1064 .route(
1065 "/pipelines/{pipeline_name}",
1066 routing::post(event::add_pipeline),
1067 )
1068 .route(
1069 "/pipelines/{pipeline_name}",
1070 routing::delete(event::delete_pipeline),
1071 )
1072 .route("/pipelines/_dryrun", routing::post(event::pipeline_dryrun))
1073 .layer(
1074 ServiceBuilder::new()
1075 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1076 )
1077 .with_state(log_state)
1078 }
1079
1080 fn route_sql<S>(api_state: ApiState) -> Router<S> {
1081 Router::new()
1082 .route("/sql", routing::get(handler::sql).post(handler::sql))
1083 .route(
1084 "/sql/parse",
1085 routing::get(handler::sql_parse).post(handler::sql_parse),
1086 )
1087 .route(
1088 "/sql/format",
1089 routing::get(handler::sql_format).post(handler::sql_format),
1090 )
1091 .route(
1092 "/promql",
1093 routing::get(handler::promql).post(handler::promql),
1094 )
1095 .with_state(api_state)
1096 }
1097
1098 fn route_logs<S>(log_handler: LogQueryHandlerRef) -> Router<S> {
1099 Router::new()
1100 .route("/logs", routing::get(logs::logs).post(logs::logs))
1101 .with_state(log_handler)
1102 }
1103
1104 pub fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
1108 Router::new()
1109 .route(
1110 "/format_query",
1111 routing::post(format_query).get(format_query),
1112 )
1113 .route("/status/buildinfo", routing::get(build_info_query))
1114 .route("/query", routing::post(instant_query).get(instant_query))
1115 .route("/query_range", routing::post(range_query).get(range_query))
1116 .route("/labels", routing::post(labels_query).get(labels_query))
1117 .route("/series", routing::post(series_query).get(series_query))
1118 .route("/parse_query", routing::post(parse_query).get(parse_query))
1119 .route(
1120 "/label/{label_name}/values",
1121 routing::get(label_values_query),
1122 )
1123 .layer(ServiceBuilder::new().layer(CompressionLayer::new()))
1124 .with_state(prometheus_handler)
1125 }
1126
1127 fn route_prom<S>(state: PromStoreState) -> Router<S> {
1133 Router::new()
1134 .route("/read", routing::post(prom_store::remote_read))
1135 .route("/write", routing::post(prom_store::remote_write))
1136 .with_state(state)
1137 }
1138
1139 fn route_influxdb<S>(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
1140 Router::new()
1141 .route("/write", routing::post(influxdb_write_v1))
1142 .route("/api/v2/write", routing::post(influxdb_write_v2))
1143 .layer(
1144 ServiceBuilder::new()
1145 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1146 )
1147 .route("/ping", routing::get(influxdb_ping))
1148 .route("/health", routing::get(influxdb_health))
1149 .with_state(influxdb_handler)
1150 }
1151
1152 fn route_opentsdb<S>(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router<S> {
1153 Router::new()
1154 .route("/api/put", routing::post(opentsdb::put))
1155 .with_state(opentsdb_handler)
1156 }
1157
1158 fn route_otlp<S>(
1159 otlp_handler: OpenTelemetryProtocolHandlerRef,
1160 with_metric_engine: bool,
1161 ) -> Router<S> {
1162 Router::new()
1163 .route("/v1/metrics", routing::post(otlp::metrics))
1164 .route("/v1/traces", routing::post(otlp::traces))
1165 .route("/v1/logs", routing::post(otlp::logs))
1166 .layer(
1167 ServiceBuilder::new()
1168 .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
1169 )
1170 .with_state(OtlpState {
1171 with_metric_engine,
1172 handler: otlp_handler,
1173 })
1174 }
1175
1176 fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
1177 Router::new()
1178 .route("/config", routing::get(handler::config))
1179 .with_state(state)
1180 }
1181
1182 fn route_jaeger<S>(handler: JaegerQueryHandlerRef) -> Router<S> {
1183 Router::new()
1184 .route("/api/services", routing::get(jaeger::handle_get_services))
1185 .route(
1186 "/api/services/{service_name}/operations",
1187 routing::get(jaeger::handle_get_operations_by_service),
1188 )
1189 .route(
1190 "/api/operations",
1191 routing::get(jaeger::handle_get_operations),
1192 )
1193 .route("/api/traces", routing::get(jaeger::handle_find_traces))
1194 .route(
1195 "/api/traces/{trace_id}",
1196 routing::get(jaeger::handle_get_trace),
1197 )
1198 .with_state(handler)
1199 }
1200}
1201
1202pub const HTTP_SERVER: &str = "HTTP_SERVER";
1203
1204#[async_trait]
1205impl Server for HttpServer {
1206 async fn shutdown(&self) -> Result<()> {
1207 let mut shutdown_tx = self.shutdown_tx.lock().await;
1208 if let Some(tx) = shutdown_tx.take()
1209 && tx.send(()).is_err()
1210 {
1211 info!("Receiver dropped, the HTTP server has already exited");
1212 }
1213 info!("Shutdown HTTP server");
1214
1215 Ok(())
1216 }
1217
1218 async fn start(&mut self, listening: SocketAddr) -> Result<()> {
1219 let (tx, rx) = oneshot::channel();
1220 let serve = {
1221 let mut shutdown_tx = self.shutdown_tx.lock().await;
1222 ensure!(
1223 shutdown_tx.is_none(),
1224 AlreadyStartedSnafu { server: "HTTP" }
1225 );
1226
1227 let mut app = self.make_app();
1228 if let Some(configurator) = self.plugins.get::<HttpConfiguratorRef<()>>() {
1229 app = configurator
1230 .configure_http(app, ())
1231 .await
1232 .context(OtherSnafu)?;
1233 }
1234 let app = self.build(app)?;
1235 let listener = tokio::net::TcpListener::bind(listening)
1236 .await
1237 .context(AddressBindSnafu { addr: listening })?
1238 .tap_io(|tcp_stream| {
1239 if let Err(e) = tcp_stream.set_nodelay(true) {
1240 error!(e; "Failed to set TCP_NODELAY on incoming connection");
1241 }
1242 });
1243 let serve = axum::serve(listener, app.into_make_service());
1244
1245 *shutdown_tx = Some(tx);
1262
1263 serve
1264 };
1265 let listening = serve.local_addr().context(InternalIoSnafu)?;
1266 info!("HTTP server is bound to {}", listening);
1267
1268 common_runtime::spawn_global(async move {
1269 if let Err(e) = serve
1270 .with_graceful_shutdown(rx.map(drop))
1271 .await
1272 .context(InternalIoSnafu)
1273 {
1274 error!(e; "Failed to shutdown http server");
1275 }
1276 });
1277
1278 self.bind_addr = Some(listening);
1279 Ok(())
1280 }
1281
1282 fn name(&self) -> &str {
1283 HTTP_SERVER
1284 }
1285
1286 fn bind_addr(&self) -> Option<SocketAddr> {
1287 self.bind_addr
1288 }
1289
1290 fn as_any(&self) -> &dyn std::any::Any {
1291 self
1292 }
1293}
1294
1295#[cfg(test)]
1296mod test {
1297 use std::future::pending;
1298 use std::io::Cursor;
1299 use std::sync::Arc;
1300
1301 use arrow_ipc::reader::FileReader;
1302 use arrow_schema::DataType;
1303 use axum::handler::Handler;
1304 use axum::http::StatusCode;
1305 use axum::routing::get;
1306 use common_query::Output;
1307 use common_recordbatch::RecordBatches;
1308 use datafusion_expr::LogicalPlan;
1309 use datatypes::prelude::*;
1310 use datatypes::schema::{ColumnSchema, Schema};
1311 use datatypes::vectors::{StringVector, UInt32Vector};
1312 use header::constants::GREPTIME_DB_HEADER_TIMEOUT;
1313 use query::parser::PromQuery;
1314 use query::query_engine::DescribeResult;
1315 use session::context::QueryContextRef;
1316 use sql::statements::statement::Statement;
1317 use tokio::sync::mpsc;
1318 use tokio::time::Instant;
1319
1320 use super::*;
1321 use crate::error::Error;
1322 use crate::http::test_helpers::TestClient;
1323 use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
1324
1325 struct DummyInstance {
1326 _tx: mpsc::Sender<(String, Vec<u8>)>,
1327 }
1328
1329 #[async_trait]
1330 impl SqlQueryHandler for DummyInstance {
1331 type Error = Error;
1332
1333 async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
1334 unimplemented!()
1335 }
1336
1337 async fn do_promql_query(
1338 &self,
1339 _: &PromQuery,
1340 _: QueryContextRef,
1341 ) -> Vec<std::result::Result<Output, Self::Error>> {
1342 unimplemented!()
1343 }
1344
1345 async fn do_exec_plan(
1346 &self,
1347 _stmt: Option<Statement>,
1348 _plan: LogicalPlan,
1349 _query_ctx: QueryContextRef,
1350 ) -> std::result::Result<Output, Self::Error> {
1351 unimplemented!()
1352 }
1353
1354 async fn do_describe(
1355 &self,
1356 _stmt: sql::statements::statement::Statement,
1357 _query_ctx: QueryContextRef,
1358 ) -> Result<Option<DescribeResult>> {
1359 unimplemented!()
1360 }
1361
1362 async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result<bool> {
1363 Ok(true)
1364 }
1365 }
1366
1367 fn timeout() -> DynamicTimeoutLayer {
1368 DynamicTimeoutLayer::new(Duration::from_millis(10))
1369 }
1370
1371 async fn forever() {
1372 pending().await
1373 }
1374
1375 fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
1376 make_test_app_custom(tx, HttpOptions::default())
1377 }
1378
1379 fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
1380 let instance = Arc::new(DummyInstance { _tx: tx });
1381 let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
1382 let server = HttpServerBuilder::new(options)
1383 .with_sql_handler(sql_instance)
1384 .build();
1385 server.build(server.make_app()).unwrap().route(
1386 "/test/timeout",
1387 get(forever.layer(ServiceBuilder::new().layer(timeout()))),
1388 )
1389 }
1390
1391 #[tokio::test]
1392 pub async fn test_cors() {
1393 let (tx, _rx) = mpsc::channel(100);
1395 let app = make_test_app(tx);
1396 let client = TestClient::new(app).await;
1397
1398 let res = client.get("/health").send().await;
1399
1400 assert_eq!(res.status(), StatusCode::OK);
1401 assert_eq!(
1402 res.headers()
1403 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1404 .expect("expect cors header origin"),
1405 "*"
1406 );
1407
1408 let res = client.get("/v1/health").send().await;
1409
1410 assert_eq!(res.status(), StatusCode::OK);
1411 assert_eq!(
1412 res.headers()
1413 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1414 .expect("expect cors header origin"),
1415 "*"
1416 );
1417
1418 let res = client
1419 .options("/health")
1420 .header("Access-Control-Request-Headers", "x-greptime-auth")
1421 .header("Access-Control-Request-Method", "DELETE")
1422 .header("Origin", "https://example.com")
1423 .send()
1424 .await;
1425 assert_eq!(res.status(), StatusCode::OK);
1426 assert_eq!(
1427 res.headers()
1428 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1429 .expect("expect cors header origin"),
1430 "*"
1431 );
1432 assert_eq!(
1433 res.headers()
1434 .get(http::header::ACCESS_CONTROL_ALLOW_HEADERS)
1435 .expect("expect cors header headers"),
1436 "*"
1437 );
1438 assert_eq!(
1439 res.headers()
1440 .get(http::header::ACCESS_CONTROL_ALLOW_METHODS)
1441 .expect("expect cors header methods"),
1442 "GET,POST,PUT,DELETE,HEAD"
1443 );
1444 }
1445
1446 #[tokio::test]
1447 pub async fn test_cors_custom_origins() {
1448 let (tx, _rx) = mpsc::channel(100);
1450 let origin = "https://example.com";
1451
1452 let options = HttpOptions {
1453 cors_allowed_origins: vec![origin.to_string()],
1454 ..Default::default()
1455 };
1456
1457 let app = make_test_app_custom(tx, options);
1458 let client = TestClient::new(app).await;
1459
1460 let res = client.get("/health").header("Origin", origin).send().await;
1461
1462 assert_eq!(res.status(), StatusCode::OK);
1463 assert_eq!(
1464 res.headers()
1465 .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1466 .expect("expect cors header origin"),
1467 origin
1468 );
1469
1470 let res = client
1471 .get("/health")
1472 .header("Origin", "https://notallowed.com")
1473 .send()
1474 .await;
1475
1476 assert_eq!(res.status(), StatusCode::OK);
1477 assert!(
1478 !res.headers()
1479 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1480 );
1481 }
1482
1483 #[tokio::test]
1484 pub async fn test_cors_disabled() {
1485 let (tx, _rx) = mpsc::channel(100);
1487
1488 let options = HttpOptions {
1489 enable_cors: false,
1490 ..Default::default()
1491 };
1492
1493 let app = make_test_app_custom(tx, options);
1494 let client = TestClient::new(app).await;
1495
1496 let res = client.get("/health").send().await;
1497
1498 assert_eq!(res.status(), StatusCode::OK);
1499 assert!(
1500 !res.headers()
1501 .contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
1502 );
1503 }
1504
1505 #[test]
1506 fn test_http_options_default() {
1507 let default = HttpOptions::default();
1508 assert_eq!("127.0.0.1:4000".to_string(), default.addr);
1509 assert_eq!(Duration::from_secs(0), default.timeout)
1510 }
1511
1512 #[tokio::test]
1513 async fn test_http_server_request_timeout() {
1514 common_telemetry::init_default_ut_logging();
1515
1516 let (tx, _rx) = mpsc::channel(100);
1517 let app = make_test_app(tx);
1518 let client = TestClient::new(app).await;
1519 let res = client.get("/test/timeout").send().await;
1520 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1521
1522 let now = Instant::now();
1523 let res = client
1524 .get("/test/timeout")
1525 .header(GREPTIME_DB_HEADER_TIMEOUT, "20ms")
1526 .send()
1527 .await;
1528 assert_eq!(res.status(), StatusCode::REQUEST_TIMEOUT);
1529 let elapsed = now.elapsed();
1530 assert!(elapsed > Duration::from_millis(15));
1531
1532 tokio::time::timeout(
1533 Duration::from_millis(15),
1534 client
1535 .get("/test/timeout")
1536 .header(GREPTIME_DB_HEADER_TIMEOUT, "0s")
1537 .send(),
1538 )
1539 .await
1540 .unwrap_err();
1541
1542 tokio::time::timeout(
1543 Duration::from_millis(15),
1544 client
1545 .get("/test/timeout")
1546 .header(
1547 GREPTIME_DB_HEADER_TIMEOUT,
1548 humantime::format_duration(Duration::default()).to_string(),
1549 )
1550 .send(),
1551 )
1552 .await
1553 .unwrap_err();
1554 }
1555
1556 #[tokio::test]
1557 async fn test_schema_for_empty_response() {
1558 let column_schemas = vec![
1559 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1560 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1561 ];
1562 let schema = Arc::new(Schema::new(column_schemas));
1563
1564 let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap();
1565 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1566
1567 let json_resp = GreptimedbV1Response::from_output(outputs).await;
1568 if let HttpResponse::GreptimedbV1(json_resp) = json_resp {
1569 let json_output = &json_resp.output[0];
1570 if let GreptimeQueryOutput::Records(r) = json_output {
1571 assert_eq!(r.num_rows(), 0);
1572 assert_eq!(r.num_cols(), 2);
1573 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1574 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1575 } else {
1576 panic!("invalid output type");
1577 }
1578 } else {
1579 panic!("invalid format")
1580 }
1581 }
1582
1583 #[tokio::test]
1584 async fn test_recordbatches_conversion() {
1585 let column_schemas = vec![
1586 ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
1587 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1588 ];
1589 let schema = Arc::new(Schema::new(column_schemas));
1590 let columns: Vec<VectorRef> = vec![
1591 Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
1592 Arc::new(StringVector::from(vec![
1593 None,
1594 Some("hello"),
1595 Some("greptime"),
1596 None,
1597 ])),
1598 ];
1599 let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
1600
1601 for format in [
1602 ResponseFormat::GreptimedbV1,
1603 ResponseFormat::InfluxdbV1,
1604 ResponseFormat::Csv(true, true),
1605 ResponseFormat::Table,
1606 ResponseFormat::Arrow,
1607 ResponseFormat::Json,
1608 ResponseFormat::Null,
1609 ] {
1610 let recordbatches =
1611 RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
1612 let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
1613 let json_resp = match format {
1614 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
1615 ResponseFormat::Csv(with_names, with_types) => {
1616 CsvResponse::from_output(outputs, with_names, with_types).await
1617 }
1618 ResponseFormat::Table => TableResponse::from_output(outputs).await,
1619 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
1620 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
1621 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
1622 ResponseFormat::Null => NullResponse::from_output(outputs).await,
1623 };
1624
1625 match json_resp {
1626 HttpResponse::GreptimedbV1(resp) => {
1627 let json_output = &resp.output[0];
1628 if let GreptimeQueryOutput::Records(r) = json_output {
1629 assert_eq!(r.num_rows(), 4);
1630 assert_eq!(r.num_cols(), 2);
1631 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1632 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1633 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1634 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1635 } else {
1636 panic!("invalid output type");
1637 }
1638 }
1639 HttpResponse::InfluxdbV1(resp) => {
1640 let json_output = &resp.results()[0];
1641 assert_eq!(json_output.num_rows(), 4);
1642 assert_eq!(json_output.num_cols(), 2);
1643 assert_eq!(json_output.series[0].columns.clone()[0], "numbers");
1644 assert_eq!(
1645 json_output.series[0].values[0][0],
1646 serde_json::Value::from(1)
1647 );
1648 assert_eq!(json_output.series[0].values[0][1], serde_json::Value::Null);
1649 }
1650 HttpResponse::Csv(resp) => {
1651 let output = &resp.output()[0];
1652 if let GreptimeQueryOutput::Records(r) = output {
1653 assert_eq!(r.num_rows(), 4);
1654 assert_eq!(r.num_cols(), 2);
1655 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1656 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1657 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1658 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1659 } else {
1660 panic!("invalid output type");
1661 }
1662 }
1663
1664 HttpResponse::Table(resp) => {
1665 let output = &resp.output()[0];
1666 if let GreptimeQueryOutput::Records(r) = output {
1667 assert_eq!(r.num_rows(), 4);
1668 assert_eq!(r.num_cols(), 2);
1669 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1670 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1671 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1672 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1673 } else {
1674 panic!("invalid output type");
1675 }
1676 }
1677
1678 HttpResponse::Arrow(resp) => {
1679 let output = resp.data;
1680 let mut reader =
1681 FileReader::try_new(Cursor::new(output), None).expect("Arrow reader error");
1682 let schema = reader.schema();
1683 assert_eq!(schema.fields[0].name(), "numbers");
1684 assert_eq!(schema.fields[0].data_type(), &DataType::UInt32);
1685 assert_eq!(schema.fields[1].name(), "strings");
1686 assert_eq!(schema.fields[1].data_type(), &DataType::Utf8);
1687
1688 let rb = reader.next().unwrap().expect("read record batch failed");
1689 assert_eq!(rb.num_columns(), 2);
1690 assert_eq!(rb.num_rows(), 4);
1691 }
1692
1693 HttpResponse::Json(resp) => {
1694 let output = &resp.output()[0];
1695 if let GreptimeQueryOutput::Records(r) = output {
1696 assert_eq!(r.num_rows(), 4);
1697 assert_eq!(r.num_cols(), 2);
1698 assert_eq!(r.schema.column_schemas[0].name, "numbers");
1699 assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
1700 assert_eq!(r.rows[0][0], serde_json::Value::from(1));
1701 assert_eq!(r.rows[0][1], serde_json::Value::Null);
1702 } else {
1703 panic!("invalid output type");
1704 }
1705 }
1706
1707 HttpResponse::Null(resp) => {
1708 assert_eq!(resp.rows(), 4);
1709 }
1710
1711 HttpResponse::Error(err) => unreachable!("{err:?}"),
1712 }
1713 }
1714 }
1715
1716 #[test]
1717 fn test_response_format_misc() {
1718 assert_eq!(ResponseFormat::default(), ResponseFormat::GreptimedbV1);
1719 assert_eq!(ResponseFormat::parse("arrow"), Some(ResponseFormat::Arrow));
1720 assert_eq!(
1721 ResponseFormat::parse("csv"),
1722 Some(ResponseFormat::Csv(false, false))
1723 );
1724 assert_eq!(
1725 ResponseFormat::parse("csvwithnames"),
1726 Some(ResponseFormat::Csv(true, false))
1727 );
1728 assert_eq!(
1729 ResponseFormat::parse("csvwithnamesandtypes"),
1730 Some(ResponseFormat::Csv(true, true))
1731 );
1732 assert_eq!(ResponseFormat::parse("table"), Some(ResponseFormat::Table));
1733 assert_eq!(
1734 ResponseFormat::parse("greptimedb_v1"),
1735 Some(ResponseFormat::GreptimedbV1)
1736 );
1737 assert_eq!(
1738 ResponseFormat::parse("influxdb_v1"),
1739 Some(ResponseFormat::InfluxdbV1)
1740 );
1741 assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json));
1742 assert_eq!(ResponseFormat::parse("null"), Some(ResponseFormat::Null));
1743
1744 assert_eq!(ResponseFormat::parse("invalid"), None);
1746 assert_eq!(ResponseFormat::parse(""), None);
1747 assert_eq!(ResponseFormat::parse("CSV"), None); assert_eq!(ResponseFormat::Arrow.as_str(), "arrow");
1751 assert_eq!(ResponseFormat::Csv(false, false).as_str(), "csv");
1752 assert_eq!(ResponseFormat::Csv(true, true).as_str(), "csv");
1753 assert_eq!(ResponseFormat::Table.as_str(), "table");
1754 assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1");
1755 assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1");
1756 assert_eq!(ResponseFormat::Json.as_str(), "json");
1757 assert_eq!(ResponseFormat::Null.as_str(), "null");
1758 assert_eq!(ResponseFormat::default().as_str(), "greptimedb_v1");
1759 }
1760}