1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::null_result::NullResponse;
47use crate::http::result::table_result::TableResponse;
48use crate::http::{
49 ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
50 HttpResponse, ResponseFormat,
51};
52use crate::metrics_handler::MetricsHandler;
53use crate::query_handler::sql::ServerSqlQueryHandlerRef;
54
55#[derive(Debug, Default, Serialize, Deserialize)]
56pub struct SqlQuery {
57 pub db: Option<String>,
58 pub sql: Option<String>,
59 pub format: Option<String>,
63 pub epoch: Option<String>,
72 pub limit: Option<usize>,
73 pub compression: Option<String>,
75}
76
77#[axum_macros::debug_handler]
79#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
80pub async fn sql(
81 State(state): State<ApiState>,
82 Query(query_params): Query<SqlQuery>,
83 Extension(mut query_ctx): Extension<QueryContext>,
84 Form(form_params): Form<SqlQuery>,
85) -> HttpResponse {
86 let start = Instant::now();
87 let sql_handler = &state.sql_handler;
88 if let Some(db) = &query_params.db.or(form_params.db) {
89 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
90 query_ctx.set_current_catalog(&catalog);
91 query_ctx.set_current_schema(&schema);
92 }
93 let db = query_ctx.get_db_string();
94
95 query_ctx.set_channel(Channel::HttpSql);
96 let query_ctx = Arc::new(query_ctx);
97
98 let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
99 .with_label_values(&[db.as_str()])
100 .start_timer();
101
102 let sql = query_params.sql.or(form_params.sql);
103 let format = query_params
104 .format
105 .or(form_params.format)
106 .map(|s| s.to_lowercase())
107 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
108 .unwrap_or(ResponseFormat::GreptimedbV1);
109 let epoch = query_params
110 .epoch
111 .or(form_params.epoch)
112 .map(|s| s.to_lowercase())
113 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
114
115 let result = if let Some(sql) = &sql {
116 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
117 Err((status, msg))
118 } else {
119 Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
120 }
121 } else {
122 Err((
123 StatusCode::InvalidArguments,
124 "sql parameter is required.".to_string(),
125 ))
126 };
127
128 let outputs = match result {
129 Err((status, msg)) => {
130 return HttpResponse::Error(
131 ErrorResponse::from_error_message(status, msg)
132 .with_execution_time(start.elapsed().as_millis() as u64),
133 );
134 }
135 Ok(outputs) => outputs,
136 };
137
138 let mut resp = match format {
139 ResponseFormat::Arrow => {
140 ArrowResponse::from_output(outputs, query_params.compression).await
141 }
142 ResponseFormat::Csv(with_names, with_types) => {
143 CsvResponse::from_output(outputs, with_names, with_types).await
144 }
145 ResponseFormat::Table => TableResponse::from_output(outputs).await,
146 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
147 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
148 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
149 ResponseFormat::Null => NullResponse::from_output(outputs).await,
150 };
151
152 if let Some(limit) = query_params.limit {
153 resp = resp.with_limit(limit);
154 }
155 resp.with_execution_time(start.elapsed().as_millis() as u64)
156}
157
158#[axum_macros::debug_handler]
160#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
161pub async fn sql_parse(
162 Query(query_params): Query<SqlQuery>,
163 Form(form_params): Form<SqlQuery>,
164) -> Result<Json<Vec<Statement>>> {
165 let Some(sql) = query_params.sql.or(form_params.sql) else {
166 return InvalidQuerySnafu {
167 reason: "sql parameter is required.",
168 }
169 .fail();
170 };
171
172 let stmts =
173 ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
174 .context(FailedToParseQuerySnafu)?;
175
176 Ok(stmts.into())
177}
178
179#[derive(Debug, Serialize, Deserialize)]
180pub struct SqlFormatResponse {
181 pub formatted: String,
182}
183
184#[axum_macros::debug_handler]
186#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
187pub async fn sql_format(
188 Query(query_params): Query<SqlQuery>,
189 Form(form_params): Form<SqlQuery>,
190) -> axum::response::Response {
191 let Some(sql) = query_params.sql.or(form_params.sql) else {
192 let resp = ErrorResponse::from_error_message(
193 StatusCode::InvalidArguments,
194 "sql parameter is required.".to_string(),
195 );
196 return HttpResponse::Error(resp).into_response();
197 };
198
199 let stmts = match ParserContext::create_with_dialect(
201 &sql,
202 &GreptimeDbDialect {},
203 ParseOptions::default(),
204 ) {
205 Ok(v) => v,
206 Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
207 };
208
209 let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
210 for stmt in stmts {
211 let mut s = format!("{:#}", stmt);
212 if !s.trim_end().ends_with(';') {
213 s.push(';');
214 }
215 parts.push(s);
216 }
217
218 let formatted = parts.join("\n");
219 Json(SqlFormatResponse { formatted }).into_response()
220}
221
222pub async fn from_output(
224 outputs: Vec<crate::error::Result<Output>>,
225) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
226 let mut results = Vec::with_capacity(outputs.len());
229 let mut merge_map = HashMap::new();
230
231 for out in outputs {
232 match out {
233 Ok(o) => match o.data {
234 OutputData::AffectedRows(rows) => {
235 results.push(GreptimeQueryOutput::AffectedRows(rows));
236 if o.meta.cost > 0 {
237 merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
238 }
239 }
240 OutputData::Stream(stream) => {
241 let schema = stream.schema().clone();
242 let mut http_record_output = match util::collect(stream).await {
244 Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
245 Ok(rows) => rows,
246 Err(err) => {
247 return Err(ErrorResponse::from_error(err));
248 }
249 },
250 Err(err) => {
251 return Err(ErrorResponse::from_error(err));
252 }
253 };
254 if let Some(physical_plan) = o.meta.plan {
255 let mut result_map = HashMap::new();
256
257 let mut tmp = vec![&mut merge_map, &mut result_map];
258 collect_plan_metrics(&physical_plan, &mut tmp);
259 let re = result_map
260 .into_iter()
261 .map(|(k, v)| (k, Value::from(v)))
262 .collect::<HashMap<String, Value>>();
263 http_record_output.metrics.extend(re);
264 }
265 results.push(GreptimeQueryOutput::Records(http_record_output))
266 }
267 OutputData::RecordBatches(rbs) => {
268 match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
269 Ok(rows) => {
270 results.push(GreptimeQueryOutput::Records(rows));
271 }
272 Err(err) => {
273 return Err(ErrorResponse::from_error(err));
274 }
275 }
276 }
277 },
278
279 Err(err) => {
280 return Err(ErrorResponse::from_error(err));
281 }
282 }
283 }
284
285 let merge_map = merge_map
286 .into_iter()
287 .map(|(k, v)| (k, Value::from(v)))
288 .collect();
289
290 Ok((results, merge_map))
291}
292
293#[derive(Debug, Default, Serialize, Deserialize)]
294pub struct PromqlQuery {
295 pub query: String,
296 pub start: String,
297 pub end: String,
298 pub step: String,
299 pub lookback: Option<String>,
300 pub db: Option<String>,
301 pub format: Option<String>,
305 pub compression: Option<String>,
307 pub epoch: Option<String>,
318}
319
320impl From<PromqlQuery> for PromQuery {
321 fn from(query: PromqlQuery) -> Self {
322 PromQuery {
323 query: query.query,
324 start: query.start,
325 end: query.end,
326 step: query.step,
327 lookback: query
328 .lookback
329 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
330 }
331 }
332}
333
334#[axum_macros::debug_handler]
336#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
337pub async fn promql(
338 State(state): State<ApiState>,
339 Query(params): Query<PromqlQuery>,
340 Extension(mut query_ctx): Extension<QueryContext>,
341) -> Response {
342 let sql_handler = &state.sql_handler;
343 let exec_start = Instant::now();
344 let db = query_ctx.get_db_string();
345
346 query_ctx.set_channel(Channel::Promql);
347 let query_ctx = Arc::new(query_ctx);
348
349 let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
350 .with_label_values(&[db.as_str()])
351 .start_timer();
352
353 let resp = if let Some((status, msg)) =
354 validate_schema(sql_handler.clone(), query_ctx.clone()).await
355 {
356 let resp = ErrorResponse::from_error_message(status, msg);
357 HttpResponse::Error(resp)
358 } else {
359 let format = params
360 .format
361 .as_ref()
362 .map(|s| s.to_lowercase())
363 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
364 .unwrap_or(ResponseFormat::GreptimedbV1);
365 let epoch = params
366 .epoch
367 .as_ref()
368 .map(|s| s.to_lowercase())
369 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
370 let compression = params.compression.clone();
371
372 let prom_query = params.into();
373 let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
374
375 match format {
376 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
377 ResponseFormat::Csv(with_names, with_types) => {
378 CsvResponse::from_output(outputs, with_names, with_types).await
379 }
380 ResponseFormat::Table => TableResponse::from_output(outputs).await,
381 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
382 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
383 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
384 ResponseFormat::Null => NullResponse::from_output(outputs).await,
385 }
386 };
387
388 resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
389 .into_response()
390}
391
392#[axum_macros::debug_handler]
394pub async fn metrics(
395 State(state): State<MetricsHandler>,
396 Query(_params): Query<HashMap<String, String>>,
397) -> String {
398 #[cfg(not(windows))]
403 if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
404 if let Err(e) = c.update() {
405 common_telemetry::error!(e; "Failed to update jemalloc metrics");
406 }
407 }
408 state.render()
409}
410
411#[derive(Debug, Serialize, Deserialize)]
412pub struct HealthQuery {}
413
414#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
415pub struct HealthResponse {}
416
417#[axum_macros::debug_handler]
421pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
422 Json(HealthResponse {})
423}
424
425#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
426pub struct StatusResponse<'a> {
427 pub source_time: &'a str,
428 pub commit: &'a str,
429 pub branch: &'a str,
430 pub rustc_version: &'a str,
431 pub hostname: String,
432 pub version: &'a str,
433}
434
435#[axum_macros::debug_handler]
437pub async fn status() -> Json<StatusResponse<'static>> {
438 let hostname = hostname::get()
439 .map(|s| s.to_string_lossy().to_string())
440 .unwrap_or_else(|_| "unknown".to_string());
441 let build_info = common_version::build_info();
442 Json(StatusResponse {
443 source_time: build_info.source_time,
444 commit: build_info.commit,
445 branch: build_info.branch,
446 rustc_version: build_info.rustc,
447 hostname,
448 version: build_info.version,
449 })
450}
451
452#[axum_macros::debug_handler]
454pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
455 (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
456}
457
458async fn validate_schema(
459 sql_handler: ServerSqlQueryHandlerRef,
460 query_ctx: QueryContextRef,
461) -> Option<(StatusCode, String)> {
462 match sql_handler
463 .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
464 .await
465 {
466 Ok(true) => None,
467 Ok(false) => Some((
468 StatusCode::DatabaseNotFound,
469 format!("Database not found: {}", query_ctx.get_db_string()),
470 )),
471 Err(e) => Some((
472 StatusCode::Internal,
473 format!(
474 "Error checking database: {}, {}",
475 query_ctx.get_db_string(),
476 e.output_msg(),
477 ),
478 )),
479 }
480}