1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::null_result::NullResponse;
47use crate::http::result::table_result::TableResponse;
48use crate::http::{
49 ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
50 HttpResponse, ResponseFormat,
51};
52use crate::metrics_handler::MetricsHandler;
53use crate::query_handler::sql::ServerSqlQueryHandlerRef;
54
55#[derive(Debug, Default, Serialize, Deserialize)]
56pub struct SqlQuery {
57 pub db: Option<String>,
58 pub sql: Option<String>,
59 pub format: Option<String>,
63 pub epoch: Option<String>,
72 pub limit: Option<usize>,
73 pub compression: Option<String>,
75}
76
77#[axum_macros::debug_handler]
79#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
80pub async fn sql(
81 State(state): State<ApiState>,
82 Query(query_params): Query<SqlQuery>,
83 Extension(mut query_ctx): Extension<QueryContext>,
84 Form(form_params): Form<SqlQuery>,
85) -> HttpResponse {
86 let start = Instant::now();
87 let sql_handler = &state.sql_handler;
88 if let Some(db) = &query_params.db.or(form_params.db) {
89 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
90 query_ctx.set_current_catalog(&catalog);
91 query_ctx.set_current_schema(&schema);
92 }
93 let db = query_ctx.get_db_string();
94
95 query_ctx.set_channel(Channel::HttpSql);
96 let query_ctx = Arc::new(query_ctx);
97
98 let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
99 .with_label_values(&[db.as_str()])
100 .start_timer();
101
102 let sql = query_params.sql.or(form_params.sql);
103 let format = query_params
104 .format
105 .or(form_params.format)
106 .map(|s| s.to_lowercase())
107 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
108 .unwrap_or(ResponseFormat::GreptimedbV1);
109 let epoch = query_params
110 .epoch
111 .or(form_params.epoch)
112 .map(|s| s.to_lowercase())
113 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
114
115 let result = if let Some(sql) = &sql {
116 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
117 Err((status, msg))
118 } else {
119 Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
120 }
121 } else {
122 Err((
123 StatusCode::InvalidArguments,
124 "sql parameter is required.".to_string(),
125 ))
126 };
127
128 let outputs = match result {
129 Err((status, msg)) => {
130 return HttpResponse::Error(
131 ErrorResponse::from_error_message(status, msg)
132 .with_execution_time(start.elapsed().as_millis() as u64),
133 );
134 }
135 Ok(outputs) => outputs,
136 };
137
138 let mut resp = match format {
139 ResponseFormat::Arrow => {
140 ArrowResponse::from_output(outputs, query_params.compression).await
141 }
142 ResponseFormat::Csv(with_names, with_types) => {
143 CsvResponse::from_output(outputs, with_names, with_types).await
144 }
145 ResponseFormat::Table => TableResponse::from_output(outputs).await,
146 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
147 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
148 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
149 ResponseFormat::Null => NullResponse::from_output(outputs).await,
150 };
151
152 if let Some(limit) = query_params.limit {
153 resp = resp.with_limit(limit);
154 }
155 resp.with_execution_time(start.elapsed().as_millis() as u64)
156}
157
158#[axum_macros::debug_handler]
160#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
161pub async fn sql_parse(
162 Query(query_params): Query<SqlQuery>,
163 Form(form_params): Form<SqlQuery>,
164) -> Result<Json<Vec<Statement>>> {
165 let Some(sql) = query_params.sql.or(form_params.sql) else {
166 return InvalidQuerySnafu {
167 reason: "sql parameter is required.",
168 }
169 .fail();
170 };
171
172 let stmts =
173 ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
174 .context(FailedToParseQuerySnafu)?;
175
176 Ok(stmts.into())
177}
178
179#[derive(Debug, Serialize, Deserialize)]
180pub struct SqlFormatResponse {
181 pub formatted: String,
182}
183
184#[axum_macros::debug_handler]
186#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
187pub async fn sql_format(
188 Query(query_params): Query<SqlQuery>,
189 Form(form_params): Form<SqlQuery>,
190) -> axum::response::Response {
191 let Some(sql) = query_params.sql.or(form_params.sql) else {
192 let resp = ErrorResponse::from_error_message(
193 StatusCode::InvalidArguments,
194 "sql parameter is required.".to_string(),
195 );
196 return HttpResponse::Error(resp).into_response();
197 };
198
199 let stmts = match ParserContext::create_with_dialect(
201 &sql,
202 &GreptimeDbDialect {},
203 ParseOptions::default(),
204 ) {
205 Ok(v) => v,
206 Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
207 };
208
209 let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
210 for stmt in stmts {
211 let mut s = format!("{:#}", stmt);
212 if !s.trim_end().ends_with(';') {
213 s.push(';');
214 }
215 parts.push(s);
216 }
217
218 let formatted = parts.join("\n");
219 Json(SqlFormatResponse { formatted }).into_response()
220}
221
222pub async fn from_output(
224 outputs: Vec<crate::error::Result<Output>>,
225) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
226 let mut results = Vec::with_capacity(outputs.len());
229 let mut merge_map = HashMap::new();
230
231 for out in outputs {
232 match out {
233 Ok(o) => match o.data {
234 OutputData::AffectedRows(rows) => {
235 results.push(GreptimeQueryOutput::AffectedRows(rows));
236 if o.meta.cost > 0 {
237 merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
238 }
239 }
240 OutputData::Stream(stream) => {
241 let schema = stream.schema().clone();
242 let mut http_record_output = match util::collect(stream).await {
244 Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
245 Ok(rows) => rows,
246 Err(err) => {
247 return Err(ErrorResponse::from_error(err));
248 }
249 },
250 Err(err) => {
251 return Err(ErrorResponse::from_error(err));
252 }
253 };
254 if let Some(physical_plan) = o.meta.plan {
255 let mut result_map = HashMap::new();
256
257 let mut tmp = vec![&mut merge_map, &mut result_map];
258 collect_plan_metrics(&physical_plan, &mut tmp);
259 let re = result_map
260 .into_iter()
261 .map(|(k, v)| (k, Value::from(v)))
262 .collect::<HashMap<String, Value>>();
263 http_record_output.metrics.extend(re);
264 }
265 results.push(GreptimeQueryOutput::Records(http_record_output))
266 }
267 OutputData::RecordBatches(rbs) => {
268 match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
269 Ok(rows) => {
270 results.push(GreptimeQueryOutput::Records(rows));
271 }
272 Err(err) => {
273 return Err(ErrorResponse::from_error(err));
274 }
275 }
276 }
277 },
278
279 Err(err) => {
280 return Err(ErrorResponse::from_error(err));
281 }
282 }
283 }
284
285 let merge_map = merge_map
286 .into_iter()
287 .map(|(k, v)| (k, Value::from(v)))
288 .collect();
289
290 Ok((results, merge_map))
291}
292
293#[derive(Debug, Default, Serialize, Deserialize)]
294pub struct PromqlQuery {
295 pub query: String,
296 pub start: String,
297 pub end: String,
298 pub step: String,
299 pub lookback: Option<String>,
300 pub db: Option<String>,
301 pub format: Option<String>,
305 pub compression: Option<String>,
307 pub epoch: Option<String>,
318}
319
320impl From<PromqlQuery> for PromQuery {
321 fn from(query: PromqlQuery) -> Self {
322 PromQuery {
323 query: query.query,
324 start: query.start,
325 end: query.end,
326 step: query.step,
327 lookback: query
328 .lookback
329 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
330 alias: None,
332 }
333 }
334}
335
336#[axum_macros::debug_handler]
338#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
339pub async fn promql(
340 State(state): State<ApiState>,
341 Query(params): Query<PromqlQuery>,
342 Extension(mut query_ctx): Extension<QueryContext>,
343) -> Response {
344 let sql_handler = &state.sql_handler;
345 let exec_start = Instant::now();
346 let db = query_ctx.get_db_string();
347
348 query_ctx.set_channel(Channel::Promql);
349 let query_ctx = Arc::new(query_ctx);
350
351 let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
352 .with_label_values(&[db.as_str()])
353 .start_timer();
354
355 let resp = if let Some((status, msg)) =
356 validate_schema(sql_handler.clone(), query_ctx.clone()).await
357 {
358 let resp = ErrorResponse::from_error_message(status, msg);
359 HttpResponse::Error(resp)
360 } else {
361 let format = params
362 .format
363 .as_ref()
364 .map(|s| s.to_lowercase())
365 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
366 .unwrap_or(ResponseFormat::GreptimedbV1);
367 let epoch = params
368 .epoch
369 .as_ref()
370 .map(|s| s.to_lowercase())
371 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
372 let compression = params.compression.clone();
373
374 let prom_query = params.into();
375 let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
376
377 match format {
378 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
379 ResponseFormat::Csv(with_names, with_types) => {
380 CsvResponse::from_output(outputs, with_names, with_types).await
381 }
382 ResponseFormat::Table => TableResponse::from_output(outputs).await,
383 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
384 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
385 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
386 ResponseFormat::Null => NullResponse::from_output(outputs).await,
387 }
388 };
389
390 resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
391 .into_response()
392}
393
394#[axum_macros::debug_handler]
396pub async fn metrics(
397 State(state): State<MetricsHandler>,
398 Query(_params): Query<HashMap<String, String>>,
399) -> String {
400 #[cfg(not(windows))]
405 if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref()
406 && let Err(e) = c.update()
407 {
408 common_telemetry::error!(e; "Failed to update jemalloc metrics");
409 }
410 state.render()
411}
412
413#[derive(Debug, Serialize, Deserialize)]
414pub struct HealthQuery {}
415
416#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
417pub struct HealthResponse {}
418
419#[axum_macros::debug_handler]
423pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
424 Json(HealthResponse {})
425}
426
427#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
428pub struct StatusResponse<'a> {
429 pub source_time: &'a str,
430 pub commit: &'a str,
431 pub branch: &'a str,
432 pub rustc_version: &'a str,
433 pub hostname: String,
434 pub version: &'a str,
435}
436
437#[axum_macros::debug_handler]
439pub async fn status() -> Json<StatusResponse<'static>> {
440 let hostname = hostname::get()
441 .map(|s| s.to_string_lossy().to_string())
442 .unwrap_or_else(|_| "unknown".to_string());
443 let build_info = common_version::build_info();
444 Json(StatusResponse {
445 source_time: build_info.source_time,
446 commit: build_info.commit,
447 branch: build_info.branch,
448 rustc_version: build_info.rustc,
449 hostname,
450 version: build_info.version,
451 })
452}
453
454#[axum_macros::debug_handler]
456pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
457 (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
458}
459
460async fn validate_schema(
461 sql_handler: ServerSqlQueryHandlerRef,
462 query_ctx: QueryContextRef,
463) -> Option<(StatusCode, String)> {
464 match sql_handler
465 .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
466 .await
467 {
468 Ok(true) => None,
469 Ok(false) => Some((
470 StatusCode::DatabaseNotFound,
471 format!("Database not found: {}", query_ctx.get_db_string()),
472 )),
473 Err(e) => Some((
474 StatusCode::Internal,
475 format!(
476 "Error checking database: {}, {}",
477 query_ctx.get_db_string(),
478 e.output_msg(),
479 ),
480 )),
481 }
482}