1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::table_result::TableResponse;
47use crate::http::{
48 ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
49 HttpResponse, ResponseFormat,
50};
51use crate::metrics_handler::MetricsHandler;
52use crate::query_handler::sql::ServerSqlQueryHandlerRef;
53
54#[derive(Debug, Default, Serialize, Deserialize)]
55pub struct SqlQuery {
56 pub db: Option<String>,
57 pub sql: Option<String>,
58 pub format: Option<String>,
62 pub epoch: Option<String>,
71 pub limit: Option<usize>,
72 pub compression: Option<String>,
74}
75
76#[axum_macros::debug_handler]
78#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
79pub async fn sql(
80 State(state): State<ApiState>,
81 Query(query_params): Query<SqlQuery>,
82 Extension(mut query_ctx): Extension<QueryContext>,
83 Form(form_params): Form<SqlQuery>,
84) -> HttpResponse {
85 let start = Instant::now();
86 let sql_handler = &state.sql_handler;
87 if let Some(db) = &query_params.db.or(form_params.db) {
88 let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
89 query_ctx.set_current_catalog(&catalog);
90 query_ctx.set_current_schema(&schema);
91 }
92 let db = query_ctx.get_db_string();
93
94 query_ctx.set_channel(Channel::Http);
95 let query_ctx = Arc::new(query_ctx);
96
97 let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
98 .with_label_values(&[db.as_str()])
99 .start_timer();
100
101 let sql = query_params.sql.or(form_params.sql);
102 let format = query_params
103 .format
104 .or(form_params.format)
105 .map(|s| s.to_lowercase())
106 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
107 .unwrap_or(ResponseFormat::GreptimedbV1);
108 let epoch = query_params
109 .epoch
110 .or(form_params.epoch)
111 .map(|s| s.to_lowercase())
112 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
113
114 let result = if let Some(sql) = &sql {
115 if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
116 Err((status, msg))
117 } else {
118 Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
119 }
120 } else {
121 Err((
122 StatusCode::InvalidArguments,
123 "sql parameter is required.".to_string(),
124 ))
125 };
126
127 let outputs = match result {
128 Err((status, msg)) => {
129 return HttpResponse::Error(
130 ErrorResponse::from_error_message(status, msg)
131 .with_execution_time(start.elapsed().as_millis() as u64),
132 );
133 }
134 Ok(outputs) => outputs,
135 };
136
137 let mut resp = match format {
138 ResponseFormat::Arrow => {
139 ArrowResponse::from_output(outputs, query_params.compression).await
140 }
141 ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
142 ResponseFormat::Table => TableResponse::from_output(outputs).await,
143 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
144 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
145 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
146 };
147
148 if let Some(limit) = query_params.limit {
149 resp = resp.with_limit(limit);
150 }
151 resp.with_execution_time(start.elapsed().as_millis() as u64)
152}
153
154#[axum_macros::debug_handler]
156#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
157pub async fn sql_parse(
158 Query(query_params): Query<SqlQuery>,
159 Form(form_params): Form<SqlQuery>,
160) -> Result<Json<Vec<Statement>>> {
161 let Some(sql) = query_params.sql.or(form_params.sql) else {
162 return InvalidQuerySnafu {
163 reason: "sql parameter is required.",
164 }
165 .fail();
166 };
167
168 let stmts =
169 ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
170 .context(FailedToParseQuerySnafu)?;
171
172 Ok(stmts.into())
173}
174
175pub async fn from_output(
177 outputs: Vec<crate::error::Result<Output>>,
178) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
179 let mut results = Vec::with_capacity(outputs.len());
182 let mut merge_map = HashMap::new();
183
184 for out in outputs {
185 match out {
186 Ok(o) => match o.data {
187 OutputData::AffectedRows(rows) => {
188 results.push(GreptimeQueryOutput::AffectedRows(rows));
189 if o.meta.cost > 0 {
190 merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
191 }
192 }
193 OutputData::Stream(stream) => {
194 let schema = stream.schema().clone();
195 let mut http_record_output = match util::collect(stream).await {
197 Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
198 Ok(rows) => rows,
199 Err(err) => {
200 return Err(ErrorResponse::from_error(err));
201 }
202 },
203 Err(err) => {
204 return Err(ErrorResponse::from_error(err));
205 }
206 };
207 if let Some(physical_plan) = o.meta.plan {
208 let mut result_map = HashMap::new();
209
210 let mut tmp = vec![&mut merge_map, &mut result_map];
211 collect_plan_metrics(&physical_plan, &mut tmp);
212 let re = result_map
213 .into_iter()
214 .map(|(k, v)| (k, Value::from(v)))
215 .collect::<HashMap<String, Value>>();
216 http_record_output.metrics.extend(re);
217 }
218 results.push(GreptimeQueryOutput::Records(http_record_output))
219 }
220 OutputData::RecordBatches(rbs) => {
221 match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
222 Ok(rows) => {
223 results.push(GreptimeQueryOutput::Records(rows));
224 }
225 Err(err) => {
226 return Err(ErrorResponse::from_error(err));
227 }
228 }
229 }
230 },
231
232 Err(err) => {
233 return Err(ErrorResponse::from_error(err));
234 }
235 }
236 }
237
238 let merge_map = merge_map
239 .into_iter()
240 .map(|(k, v)| (k, Value::from(v)))
241 .collect();
242
243 Ok((results, merge_map))
244}
245
246#[derive(Debug, Default, Serialize, Deserialize)]
247pub struct PromqlQuery {
248 pub query: String,
249 pub start: String,
250 pub end: String,
251 pub step: String,
252 pub lookback: Option<String>,
253 pub db: Option<String>,
254 pub format: Option<String>,
258 pub compression: Option<String>,
260 pub epoch: Option<String>,
271}
272
273impl From<PromqlQuery> for PromQuery {
274 fn from(query: PromqlQuery) -> Self {
275 PromQuery {
276 query: query.query,
277 start: query.start,
278 end: query.end,
279 step: query.step,
280 lookback: query
281 .lookback
282 .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
283 }
284 }
285}
286
287#[axum_macros::debug_handler]
289#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
290pub async fn promql(
291 State(state): State<ApiState>,
292 Query(params): Query<PromqlQuery>,
293 Extension(mut query_ctx): Extension<QueryContext>,
294) -> Response {
295 let sql_handler = &state.sql_handler;
296 let exec_start = Instant::now();
297 let db = query_ctx.get_db_string();
298
299 query_ctx.set_channel(Channel::Http);
300 let query_ctx = Arc::new(query_ctx);
301
302 let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
303 .with_label_values(&[db.as_str()])
304 .start_timer();
305
306 let resp = if let Some((status, msg)) =
307 validate_schema(sql_handler.clone(), query_ctx.clone()).await
308 {
309 let resp = ErrorResponse::from_error_message(status, msg);
310 HttpResponse::Error(resp)
311 } else {
312 let format = params
313 .format
314 .as_ref()
315 .map(|s| s.to_lowercase())
316 .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
317 .unwrap_or(ResponseFormat::GreptimedbV1);
318 let epoch = params
319 .epoch
320 .as_ref()
321 .map(|s| s.to_lowercase())
322 .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
323 let compression = params.compression.clone();
324
325 let prom_query = params.into();
326 let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
327
328 match format {
329 ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
330 ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
331 ResponseFormat::Table => TableResponse::from_output(outputs).await,
332 ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
333 ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
334 ResponseFormat::Json => JsonResponse::from_output(outputs).await,
335 }
336 };
337
338 resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
339 .into_response()
340}
341
342#[axum_macros::debug_handler]
344pub async fn metrics(
345 State(state): State<MetricsHandler>,
346 Query(_params): Query<HashMap<String, String>>,
347) -> String {
348 #[cfg(not(windows))]
353 if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
354 if let Err(e) = c.update() {
355 common_telemetry::error!(e; "Failed to update jemalloc metrics");
356 }
357 }
358 state.render()
359}
360
361#[derive(Debug, Serialize, Deserialize)]
362pub struct HealthQuery {}
363
364#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
365pub struct HealthResponse {}
366
367#[axum_macros::debug_handler]
371pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
372 Json(HealthResponse {})
373}
374
375#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
376pub struct StatusResponse<'a> {
377 pub source_time: &'a str,
378 pub commit: &'a str,
379 pub branch: &'a str,
380 pub rustc_version: &'a str,
381 pub hostname: String,
382 pub version: &'a str,
383}
384
385#[axum_macros::debug_handler]
387pub async fn status() -> Json<StatusResponse<'static>> {
388 let hostname = hostname::get()
389 .map(|s| s.to_string_lossy().to_string())
390 .unwrap_or_else(|_| "unknown".to_string());
391 let build_info = common_version::build_info();
392 Json(StatusResponse {
393 source_time: build_info.source_time,
394 commit: build_info.commit,
395 branch: build_info.branch,
396 rustc_version: build_info.rustc,
397 hostname,
398 version: build_info.version,
399 })
400}
401
402#[axum_macros::debug_handler]
404pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
405 (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
406}
407
408async fn validate_schema(
409 sql_handler: ServerSqlQueryHandlerRef,
410 query_ctx: QueryContextRef,
411) -> Option<(StatusCode, String)> {
412 match sql_handler
413 .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
414 .await
415 {
416 Ok(true) => None,
417 Ok(false) => Some((
418 StatusCode::DatabaseNotFound,
419 format!("Database not found: {}", query_ctx.get_db_string()),
420 )),
421 Err(e) => Some((
422 StatusCode::Internal,
423 format!(
424 "Error checking database: {}, {}",
425 query_ctx.get_db_string(),
426 e.output_msg(),
427 ),
428 )),
429 }
430}