1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::table_result::TableResponse;
47use crate::http::{
48 ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
49 HttpResponse, ResponseFormat,
50};
51use crate::metrics_handler::MetricsHandler;
52use crate::query_handler::sql::ServerSqlQueryHandlerRef;
53
54#[derive(Debug, Default, Serialize, Deserialize)]
55pub struct SqlQuery {
56 pub db: Option<String>,
57 pub sql: Option<String>,
58 pub format: Option<String>,
62 pub epoch: Option<String>,
71 pub limit: Option<usize>,
72 pub compression: Option<String>,
74}
75
76#[axum_macros::debug_handler]
78#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
79pub async fn sql(
80 State(state): State<ApiState>,
81 Query(query_params): Query<SqlQuery>,
82 Extension(mut query_ctx): Extension<QueryContext>,
83 Form(form_params): Form<SqlQuery>,
84) -> HttpResponse {
85 let start = Instant::now();
86 let sql_handler = &state.sql_handler;
87 if let Some(db) = &query_params.db.or(form_params.db) {
88 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
89 query_ctx.set_current_catalog(&catalog);
90 query_ctx.set_current_schema(&schema);
91 }
92 let db = query_ctx.get_db_string();
93
94 query_ctx.set_channel(Channel::Http);
95 let query_ctx = Arc::new(query_ctx);
96
97 let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
98 .with_label_values(&[db.as_str()])
99 .start_timer();
100
101 let sql = query_params.sql.or(form_params.sql);
102 let format = query_params
103 .format
104 .or(form_params.format)
105 .map(|s| s.to_lowercase())
106 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
107 .unwrap_or(ResponseFormat::GreptimedbV1);
108 let epoch = query_params
109 .epoch
110 .or(form_params.epoch)
111 .map(|s| s.to_lowercase())
112 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
113
114 let result = if let Some(sql) = &sql {
115 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
116 Err((status, msg))
117 } else {
118 Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
119 }
120 } else {
121 Err((
122 StatusCode::InvalidArguments,
123 "sql parameter is required.".to_string(),
124 ))
125 };
126
127 let outputs = match result {
128 Err((status, msg)) => {
129 return HttpResponse::Error(
130 ErrorResponse::from_error_message(status, msg)
131 .with_execution_time(start.elapsed().as_millis() as u64),
132 );
133 }
134 Ok(outputs) => outputs,
135 };
136
137 let mut resp = match format {
138 ResponseFormat::Arrow => {
139 ArrowResponse::from_output(outputs, query_params.compression).await
140 }
141 ResponseFormat::Csv(with_names, with_types) => {
142 CsvResponse::from_output(outputs, with_names, with_types).await
143 }
144 ResponseFormat::Table => TableResponse::from_output(outputs).await,
145 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
146 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
147 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
148 };
149
150 if let Some(limit) = query_params.limit {
151 resp = resp.with_limit(limit);
152 }
153 resp.with_execution_time(start.elapsed().as_millis() as u64)
154}
155
156#[axum_macros::debug_handler]
158#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
159pub async fn sql_parse(
160 Query(query_params): Query<SqlQuery>,
161 Form(form_params): Form<SqlQuery>,
162) -> Result<Json<Vec<Statement>>> {
163 let Some(sql) = query_params.sql.or(form_params.sql) else {
164 return InvalidQuerySnafu {
165 reason: "sql parameter is required.",
166 }
167 .fail();
168 };
169
170 let stmts =
171 ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
172 .context(FailedToParseQuerySnafu)?;
173
174 Ok(stmts.into())
175}
176
177pub async fn from_output(
179 outputs: Vec<crate::error::Result<Output>>,
180) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
181 let mut results = Vec::with_capacity(outputs.len());
184 let mut merge_map = HashMap::new();
185
186 for out in outputs {
187 match out {
188 Ok(o) => match o.data {
189 OutputData::AffectedRows(rows) => {
190 results.push(GreptimeQueryOutput::AffectedRows(rows));
191 if o.meta.cost > 0 {
192 merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
193 }
194 }
195 OutputData::Stream(stream) => {
196 let schema = stream.schema().clone();
197 let mut http_record_output = match util::collect(stream).await {
199 Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
200 Ok(rows) => rows,
201 Err(err) => {
202 return Err(ErrorResponse::from_error(err));
203 }
204 },
205 Err(err) => {
206 return Err(ErrorResponse::from_error(err));
207 }
208 };
209 if let Some(physical_plan) = o.meta.plan {
210 let mut result_map = HashMap::new();
211
212 let mut tmp = vec![&mut merge_map, &mut result_map];
213 collect_plan_metrics(&physical_plan, &mut tmp);
214 let re = result_map
215 .into_iter()
216 .map(|(k, v)| (k, Value::from(v)))
217 .collect::<HashMap<String, Value>>();
218 http_record_output.metrics.extend(re);
219 }
220 results.push(GreptimeQueryOutput::Records(http_record_output))
221 }
222 OutputData::RecordBatches(rbs) => {
223 match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
224 Ok(rows) => {
225 results.push(GreptimeQueryOutput::Records(rows));
226 }
227 Err(err) => {
228 return Err(ErrorResponse::from_error(err));
229 }
230 }
231 }
232 },
233
234 Err(err) => {
235 return Err(ErrorResponse::from_error(err));
236 }
237 }
238 }
239
240 let merge_map = merge_map
241 .into_iter()
242 .map(|(k, v)| (k, Value::from(v)))
243 .collect();
244
245 Ok((results, merge_map))
246}
247
248#[derive(Debug, Default, Serialize, Deserialize)]
249pub struct PromqlQuery {
250 pub query: String,
251 pub start: String,
252 pub end: String,
253 pub step: String,
254 pub lookback: Option<String>,
255 pub db: Option<String>,
256 pub format: Option<String>,
260 pub compression: Option<String>,
262 pub epoch: Option<String>,
273}
274
275impl From<PromqlQuery> for PromQuery {
276 fn from(query: PromqlQuery) -> Self {
277 PromQuery {
278 query: query.query,
279 start: query.start,
280 end: query.end,
281 step: query.step,
282 lookback: query
283 .lookback
284 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
285 }
286 }
287}
288
289#[axum_macros::debug_handler]
291#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
292pub async fn promql(
293 State(state): State<ApiState>,
294 Query(params): Query<PromqlQuery>,
295 Extension(mut query_ctx): Extension<QueryContext>,
296) -> Response {
297 let sql_handler = &state.sql_handler;
298 let exec_start = Instant::now();
299 let db = query_ctx.get_db_string();
300
301 query_ctx.set_channel(Channel::Http);
302 let query_ctx = Arc::new(query_ctx);
303
304 let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
305 .with_label_values(&[db.as_str()])
306 .start_timer();
307
308 let resp = if let Some((status, msg)) =
309 validate_schema(sql_handler.clone(), query_ctx.clone()).await
310 {
311 let resp = ErrorResponse::from_error_message(status, msg);
312 HttpResponse::Error(resp)
313 } else {
314 let format = params
315 .format
316 .as_ref()
317 .map(|s| s.to_lowercase())
318 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
319 .unwrap_or(ResponseFormat::GreptimedbV1);
320 let epoch = params
321 .epoch
322 .as_ref()
323 .map(|s| s.to_lowercase())
324 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
325 let compression = params.compression.clone();
326
327 let prom_query = params.into();
328 let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
329
330 match format {
331 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
332 ResponseFormat::Csv(with_names, with_types) => {
333 CsvResponse::from_output(outputs, with_names, with_types).await
334 }
335 ResponseFormat::Table => TableResponse::from_output(outputs).await,
336 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
337 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
338 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
339 }
340 };
341
342 resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
343 .into_response()
344}
345
346#[axum_macros::debug_handler]
348pub async fn metrics(
349 State(state): State<MetricsHandler>,
350 Query(_params): Query<HashMap<String, String>>,
351) -> String {
352 #[cfg(not(windows))]
357 if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
358 if let Err(e) = c.update() {
359 common_telemetry::error!(e; "Failed to update jemalloc metrics");
360 }
361 }
362 state.render()
363}
364
365#[derive(Debug, Serialize, Deserialize)]
366pub struct HealthQuery {}
367
368#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
369pub struct HealthResponse {}
370
371#[axum_macros::debug_handler]
375pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
376 Json(HealthResponse {})
377}
378
379#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
380pub struct StatusResponse<'a> {
381 pub source_time: &'a str,
382 pub commit: &'a str,
383 pub branch: &'a str,
384 pub rustc_version: &'a str,
385 pub hostname: String,
386 pub version: &'a str,
387}
388
389#[axum_macros::debug_handler]
391pub async fn status() -> Json<StatusResponse<'static>> {
392 let hostname = hostname::get()
393 .map(|s| s.to_string_lossy().to_string())
394 .unwrap_or_else(|_| "unknown".to_string());
395 let build_info = common_version::build_info();
396 Json(StatusResponse {
397 source_time: build_info.source_time,
398 commit: build_info.commit,
399 branch: build_info.branch,
400 rustc_version: build_info.rustc,
401 hostname,
402 version: build_info.version,
403 })
404}
405
406#[axum_macros::debug_handler]
408pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
409 (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
410}
411
412async fn validate_schema(
413 sql_handler: ServerSqlQueryHandlerRef,
414 query_ctx: QueryContextRef,
415) -> Option<(StatusCode, String)> {
416 match sql_handler
417 .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
418 .await
419 {
420 Ok(true) => None,
421 Ok(false) => Some((
422 StatusCode::DatabaseNotFound,
423 format!("Database not found: {}", query_ctx.get_db_string()),
424 )),
425 Err(e) => Some((
426 StatusCode::Internal,
427 format!(
428 "Error checking database: {}, {}",
429 query_ctx.get_db_string(),
430 e.output_msg(),
431 ),
432 )),
433 }
434}