servers/http/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::null_result::NullResponse;
47use crate::http::result::table_result::TableResponse;
48use crate::http::{
49    ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
50    HttpResponse, ResponseFormat,
51};
52use crate::metrics_handler::MetricsHandler;
53use crate::query_handler::sql::ServerSqlQueryHandlerRef;
54
55#[derive(Debug, Default, Serialize, Deserialize)]
56pub struct SqlQuery {
57    pub db: Option<String>,
58    pub sql: Option<String>,
59    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
60    // `arrow`],
61    // the default value is `greptimedb_v1`
62    pub format: Option<String>,
63    // Returns epoch timestamps with the specified precision.
64    // Both u and µ indicate microseconds.
65    // epoch = [ns,u,µ,ms,s],
66    //
67    // TODO(jeremy): currently, only InfluxDB result format is supported,
68    // and all columns of the `Timestamp` type will be converted to their
69    // specified time precision. Maybe greptimedb format can support this
70    // param too.
71    pub epoch: Option<String>,
72    pub limit: Option<usize>,
73    // For arrow output
74    pub compression: Option<String>,
75}
76
77/// Handler to execute sql
78#[axum_macros::debug_handler]
79#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
80pub async fn sql(
81    State(state): State<ApiState>,
82    Query(query_params): Query<SqlQuery>,
83    Extension(mut query_ctx): Extension<QueryContext>,
84    Form(form_params): Form<SqlQuery>,
85) -> HttpResponse {
86    let start = Instant::now();
87    let sql_handler = &state.sql_handler;
88    if let Some(db) = &query_params.db.or(form_params.db) {
89        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
90        query_ctx.set_current_catalog(&catalog);
91        query_ctx.set_current_schema(&schema);
92    }
93    let db = query_ctx.get_db_string();
94
95    query_ctx.set_channel(Channel::HttpSql);
96    let query_ctx = Arc::new(query_ctx);
97
98    let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
99        .with_label_values(&[db.as_str()])
100        .start_timer();
101
102    let sql = query_params.sql.or(form_params.sql);
103    let format = query_params
104        .format
105        .or(form_params.format)
106        .map(|s| s.to_lowercase())
107        .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
108        .unwrap_or(ResponseFormat::GreptimedbV1);
109    let epoch = query_params
110        .epoch
111        .or(form_params.epoch)
112        .map(|s| s.to_lowercase())
113        .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
114
115    let result = if let Some(sql) = &sql {
116        if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
117            Err((status, msg))
118        } else {
119            Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
120        }
121    } else {
122        Err((
123            StatusCode::InvalidArguments,
124            "sql parameter is required.".to_string(),
125        ))
126    };
127
128    let outputs = match result {
129        Err((status, msg)) => {
130            return HttpResponse::Error(
131                ErrorResponse::from_error_message(status, msg)
132                    .with_execution_time(start.elapsed().as_millis() as u64),
133            );
134        }
135        Ok(outputs) => outputs,
136    };
137
138    let mut resp = match format {
139        ResponseFormat::Arrow => {
140            ArrowResponse::from_output(outputs, query_params.compression).await
141        }
142        ResponseFormat::Csv(with_names, with_types) => {
143            CsvResponse::from_output(outputs, with_names, with_types).await
144        }
145        ResponseFormat::Table => TableResponse::from_output(outputs).await,
146        ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
147        ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
148        ResponseFormat::Json => JsonResponse::from_output(outputs).await,
149        ResponseFormat::Null => NullResponse::from_output(outputs).await,
150    };
151
152    if let Some(limit) = query_params.limit {
153        resp = resp.with_limit(limit);
154    }
155    resp.with_execution_time(start.elapsed().as_millis() as u64)
156}
157
158/// Handler to parse sql
159#[axum_macros::debug_handler]
160#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
161pub async fn sql_parse(
162    Query(query_params): Query<SqlQuery>,
163    Form(form_params): Form<SqlQuery>,
164) -> Result<Json<Vec<Statement>>> {
165    let Some(sql) = query_params.sql.or(form_params.sql) else {
166        return InvalidQuerySnafu {
167            reason: "sql parameter is required.",
168        }
169        .fail();
170    };
171
172    let stmts =
173        ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
174            .context(FailedToParseQuerySnafu)?;
175
176    Ok(stmts.into())
177}
178
179#[derive(Debug, Serialize, Deserialize)]
180pub struct SqlFormatResponse {
181    pub formatted: String,
182}
183
184/// Handler to format sql string
185#[axum_macros::debug_handler]
186#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
187pub async fn sql_format(
188    Query(query_params): Query<SqlQuery>,
189    Form(form_params): Form<SqlQuery>,
190) -> axum::response::Response {
191    let Some(sql) = query_params.sql.or(form_params.sql) else {
192        let resp = ErrorResponse::from_error_message(
193            StatusCode::InvalidArguments,
194            "sql parameter is required.".to_string(),
195        );
196        return HttpResponse::Error(resp).into_response();
197    };
198
199    // Parse using GreptimeDB dialect then reconstruct statements via Display
200    let stmts = match ParserContext::create_with_dialect(
201        &sql,
202        &GreptimeDbDialect {},
203        ParseOptions::default(),
204    ) {
205        Ok(v) => v,
206        Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
207    };
208
209    let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
210    for stmt in stmts {
211        let mut s = format!("{:#}", stmt);
212        if !s.trim_end().ends_with(';') {
213            s.push(';');
214        }
215        parts.push(s);
216    }
217
218    let formatted = parts.join("\n");
219    Json(SqlFormatResponse { formatted }).into_response()
220}
221
222/// Create a response from query result
223pub async fn from_output(
224    outputs: Vec<crate::error::Result<Output>>,
225) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
226    // TODO(sunng87): this api response structure cannot represent error well.
227    //  It hides successful execution results from error response
228    let mut results = Vec::with_capacity(outputs.len());
229    let mut merge_map = HashMap::new();
230
231    for out in outputs {
232        match out {
233            Ok(o) => match o.data {
234                OutputData::AffectedRows(rows) => {
235                    results.push(GreptimeQueryOutput::AffectedRows(rows));
236                    if o.meta.cost > 0 {
237                        merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
238                    }
239                }
240                OutputData::Stream(stream) => {
241                    let schema = stream.schema().clone();
242                    // TODO(sunng87): streaming response
243                    let mut http_record_output = match util::collect(stream).await {
244                        Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
245                            Ok(rows) => rows,
246                            Err(err) => {
247                                return Err(ErrorResponse::from_error(err));
248                            }
249                        },
250                        Err(err) => {
251                            return Err(ErrorResponse::from_error(err));
252                        }
253                    };
254                    if let Some(physical_plan) = o.meta.plan {
255                        let mut result_map = HashMap::new();
256
257                        let mut tmp = vec![&mut merge_map, &mut result_map];
258                        collect_plan_metrics(&physical_plan, &mut tmp);
259                        let re = result_map
260                            .into_iter()
261                            .map(|(k, v)| (k, Value::from(v)))
262                            .collect::<HashMap<String, Value>>();
263                        http_record_output.metrics.extend(re);
264                    }
265                    results.push(GreptimeQueryOutput::Records(http_record_output))
266                }
267                OutputData::RecordBatches(rbs) => {
268                    match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
269                        Ok(rows) => {
270                            results.push(GreptimeQueryOutput::Records(rows));
271                        }
272                        Err(err) => {
273                            return Err(ErrorResponse::from_error(err));
274                        }
275                    }
276                }
277            },
278
279            Err(err) => {
280                return Err(ErrorResponse::from_error(err));
281            }
282        }
283    }
284
285    let merge_map = merge_map
286        .into_iter()
287        .map(|(k, v)| (k, Value::from(v)))
288        .collect();
289
290    Ok((results, merge_map))
291}
292
293#[derive(Debug, Default, Serialize, Deserialize)]
294pub struct PromqlQuery {
295    pub query: String,
296    pub start: String,
297    pub end: String,
298    pub step: String,
299    pub lookback: Option<String>,
300    pub db: Option<String>,
301    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
302    // `arrow`],
303    // the default value is `greptimedb_v1`
304    pub format: Option<String>,
305    // For arrow output
306    pub compression: Option<String>,
307    // Returns epoch timestamps with the specified precision.
308    // Both u and µ indicate microseconds.
309    // epoch = [ns,u,µ,ms,s],
310    //
311    // For influx output only
312    //
313    // TODO(jeremy): currently, only InfluxDB result format is supported,
314    // and all columns of the `Timestamp` type will be converted to their
315    // specified time precision. Maybe greptimedb format can support this
316    // param too.
317    pub epoch: Option<String>,
318}
319
320impl From<PromqlQuery> for PromQuery {
321    fn from(query: PromqlQuery) -> Self {
322        PromQuery {
323            query: query.query,
324            start: query.start,
325            end: query.end,
326            step: query.step,
327            lookback: query
328                .lookback
329                .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
330            // TODO(dennis): support alias from http params or parse from query.query
331            alias: None,
332        }
333    }
334}
335
336/// Handler to execute promql
337#[axum_macros::debug_handler]
338#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
339pub async fn promql(
340    State(state): State<ApiState>,
341    Query(params): Query<PromqlQuery>,
342    Extension(mut query_ctx): Extension<QueryContext>,
343) -> Response {
344    let sql_handler = &state.sql_handler;
345    let exec_start = Instant::now();
346    let db = query_ctx.get_db_string();
347
348    query_ctx.set_channel(Channel::Promql);
349    let query_ctx = Arc::new(query_ctx);
350
351    let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
352        .with_label_values(&[db.as_str()])
353        .start_timer();
354
355    let resp = if let Some((status, msg)) =
356        validate_schema(sql_handler.clone(), query_ctx.clone()).await
357    {
358        let resp = ErrorResponse::from_error_message(status, msg);
359        HttpResponse::Error(resp)
360    } else {
361        let format = params
362            .format
363            .as_ref()
364            .map(|s| s.to_lowercase())
365            .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
366            .unwrap_or(ResponseFormat::GreptimedbV1);
367        let epoch = params
368            .epoch
369            .as_ref()
370            .map(|s| s.to_lowercase())
371            .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
372        let compression = params.compression.clone();
373
374        let prom_query = params.into();
375        let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
376
377        match format {
378            ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
379            ResponseFormat::Csv(with_names, with_types) => {
380                CsvResponse::from_output(outputs, with_names, with_types).await
381            }
382            ResponseFormat::Table => TableResponse::from_output(outputs).await,
383            ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
384            ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
385            ResponseFormat::Json => JsonResponse::from_output(outputs).await,
386            ResponseFormat::Null => NullResponse::from_output(outputs).await,
387        }
388    };
389
390    resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
391        .into_response()
392}
393
394/// Handler to export metrics
395#[axum_macros::debug_handler]
396pub async fn metrics(
397    State(state): State<MetricsHandler>,
398    Query(_params): Query<HashMap<String, String>>,
399) -> String {
400    // A default ProcessCollector is registered automatically in prometheus.
401    // We do not need to explicitly collect process-related data.
402    // But ProcessCollector only support on linux.
403
404    #[cfg(not(windows))]
405    if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref()
406        && let Err(e) = c.update()
407    {
408        common_telemetry::error!(e; "Failed to update jemalloc metrics");
409    }
410    state.render()
411}
412
413#[derive(Debug, Serialize, Deserialize)]
414pub struct HealthQuery {}
415
416#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
417pub struct HealthResponse {}
418
419/// Handler to export healthy check
420///
421/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
422#[axum_macros::debug_handler]
423pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
424    Json(HealthResponse {})
425}
426
427#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
428pub struct StatusResponse<'a> {
429    pub source_time: &'a str,
430    pub commit: &'a str,
431    pub branch: &'a str,
432    pub rustc_version: &'a str,
433    pub hostname: String,
434    pub version: &'a str,
435}
436
437/// Handler to expose information info about runtime, build, etc.
438#[axum_macros::debug_handler]
439pub async fn status() -> Json<StatusResponse<'static>> {
440    let hostname = hostname::get()
441        .map(|s| s.to_string_lossy().to_string())
442        .unwrap_or_else(|_| "unknown".to_string());
443    let build_info = common_version::build_info();
444    Json(StatusResponse {
445        source_time: build_info.source_time,
446        commit: build_info.commit,
447        branch: build_info.branch,
448        rustc_version: build_info.rustc,
449        hostname,
450        version: build_info.version,
451    })
452}
453
454/// Handler to expose configuration information info about runtime, build, etc.
455#[axum_macros::debug_handler]
456pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
457    (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
458}
459
460async fn validate_schema(
461    sql_handler: ServerSqlQueryHandlerRef,
462    query_ctx: QueryContextRef,
463) -> Option<(StatusCode, String)> {
464    match sql_handler
465        .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
466        .await
467    {
468        Ok(true) => None,
469        Ok(false) => Some((
470            StatusCode::DatabaseNotFound,
471            format!("Database not found: {}", query_ctx.get_db_string()),
472        )),
473        Err(e) => Some((
474            StatusCode::Internal,
475            format!(
476                "Error checking database: {}, {}",
477                query_ctx.get_db_string(),
478                e.output_msg(),
479            ),
480        )),
481    }
482}