servers/http/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::null_result::NullResponse;
47use crate::http::result::table_result::TableResponse;
48use crate::http::{
49    ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
50    HttpResponse, ResponseFormat,
51};
52use crate::metrics_handler::MetricsHandler;
53use crate::query_handler::sql::ServerSqlQueryHandlerRef;
54
55#[derive(Debug, Default, Serialize, Deserialize)]
56pub struct SqlQuery {
57    pub db: Option<String>,
58    pub sql: Option<String>,
59    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
60    // `arrow`],
61    // the default value is `greptimedb_v1`
62    pub format: Option<String>,
63    // Returns epoch timestamps with the specified precision.
64    // Both u and µ indicate microseconds.
65    // epoch = [ns,u,µ,ms,s],
66    //
67    // TODO(jeremy): currently, only InfluxDB result format is supported,
68    // and all columns of the `Timestamp` type will be converted to their
69    // specified time precision. Maybe greptimedb format can support this
70    // param too.
71    pub epoch: Option<String>,
72    pub limit: Option<usize>,
73    // For arrow output
74    pub compression: Option<String>,
75}
76
77/// Handler to execute sql
78#[axum_macros::debug_handler]
79#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
80pub async fn sql(
81    State(state): State<ApiState>,
82    Query(query_params): Query<SqlQuery>,
83    Extension(mut query_ctx): Extension<QueryContext>,
84    Form(form_params): Form<SqlQuery>,
85) -> HttpResponse {
86    let start = Instant::now();
87    let sql_handler = &state.sql_handler;
88    if let Some(db) = &query_params.db.or(form_params.db) {
89        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
90        query_ctx.set_current_catalog(&catalog);
91        query_ctx.set_current_schema(&schema);
92    }
93    let db = query_ctx.get_db_string();
94
95    query_ctx.set_channel(Channel::HttpSql);
96    let query_ctx = Arc::new(query_ctx);
97
98    let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
99        .with_label_values(&[db.as_str()])
100        .start_timer();
101
102    let sql = query_params.sql.or(form_params.sql);
103    let format = query_params
104        .format
105        .or(form_params.format)
106        .map(|s| s.to_lowercase())
107        .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
108        .unwrap_or(ResponseFormat::GreptimedbV1);
109    let epoch = query_params
110        .epoch
111        .or(form_params.epoch)
112        .map(|s| s.to_lowercase())
113        .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
114
115    let result = if let Some(sql) = &sql {
116        if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
117            Err((status, msg))
118        } else {
119            Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
120        }
121    } else {
122        Err((
123            StatusCode::InvalidArguments,
124            "sql parameter is required.".to_string(),
125        ))
126    };
127
128    let outputs = match result {
129        Err((status, msg)) => {
130            return HttpResponse::Error(
131                ErrorResponse::from_error_message(status, msg)
132                    .with_execution_time(start.elapsed().as_millis() as u64),
133            );
134        }
135        Ok(outputs) => outputs,
136    };
137
138    let mut resp = match format {
139        ResponseFormat::Arrow => {
140            ArrowResponse::from_output(outputs, query_params.compression).await
141        }
142        ResponseFormat::Csv(with_names, with_types) => {
143            CsvResponse::from_output(outputs, with_names, with_types).await
144        }
145        ResponseFormat::Table => TableResponse::from_output(outputs).await,
146        ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
147        ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
148        ResponseFormat::Json => JsonResponse::from_output(outputs).await,
149        ResponseFormat::Null => NullResponse::from_output(outputs).await,
150    };
151
152    if let Some(limit) = query_params.limit {
153        resp = resp.with_limit(limit);
154    }
155    resp.with_execution_time(start.elapsed().as_millis() as u64)
156}
157
158/// Handler to parse sql
159#[axum_macros::debug_handler]
160#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
161pub async fn sql_parse(
162    Query(query_params): Query<SqlQuery>,
163    Form(form_params): Form<SqlQuery>,
164) -> Result<Json<Vec<Statement>>> {
165    let Some(sql) = query_params.sql.or(form_params.sql) else {
166        return InvalidQuerySnafu {
167            reason: "sql parameter is required.",
168        }
169        .fail();
170    };
171
172    let stmts =
173        ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
174            .context(FailedToParseQuerySnafu)?;
175
176    Ok(stmts.into())
177}
178
179#[derive(Debug, Serialize, Deserialize)]
180pub struct SqlFormatResponse {
181    pub formatted: String,
182}
183
184/// Handler to format sql string
185#[axum_macros::debug_handler]
186#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
187pub async fn sql_format(
188    Query(query_params): Query<SqlQuery>,
189    Form(form_params): Form<SqlQuery>,
190) -> axum::response::Response {
191    let Some(sql) = query_params.sql.or(form_params.sql) else {
192        let resp = ErrorResponse::from_error_message(
193            StatusCode::InvalidArguments,
194            "sql parameter is required.".to_string(),
195        );
196        return HttpResponse::Error(resp).into_response();
197    };
198
199    // Parse using GreptimeDB dialect then reconstruct statements via Display
200    let stmts = match ParserContext::create_with_dialect(
201        &sql,
202        &GreptimeDbDialect {},
203        ParseOptions::default(),
204    ) {
205        Ok(v) => v,
206        Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
207    };
208
209    let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
210    for stmt in stmts {
211        let mut s = format!("{:#}", stmt);
212        if !s.trim_end().ends_with(';') {
213            s.push(';');
214        }
215        parts.push(s);
216    }
217
218    let formatted = parts.join("\n");
219    Json(SqlFormatResponse { formatted }).into_response()
220}
221
222/// Create a response from query result
223pub async fn from_output(
224    outputs: Vec<crate::error::Result<Output>>,
225) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
226    // TODO(sunng87): this api response structure cannot represent error well.
227    //  It hides successful execution results from error response
228    let mut results = Vec::with_capacity(outputs.len());
229    let mut merge_map = HashMap::new();
230
231    for out in outputs {
232        match out {
233            Ok(o) => match o.data {
234                OutputData::AffectedRows(rows) => {
235                    results.push(GreptimeQueryOutput::AffectedRows(rows));
236                    if o.meta.cost > 0 {
237                        merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
238                    }
239                }
240                OutputData::Stream(stream) => {
241                    let schema = stream.schema().clone();
242                    // TODO(sunng87): streaming response
243                    let mut http_record_output = match util::collect(stream).await {
244                        Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
245                            Ok(rows) => rows,
246                            Err(err) => {
247                                return Err(ErrorResponse::from_error(err));
248                            }
249                        },
250                        Err(err) => {
251                            return Err(ErrorResponse::from_error(err));
252                        }
253                    };
254                    if let Some(physical_plan) = o.meta.plan {
255                        let mut result_map = HashMap::new();
256
257                        let mut tmp = vec![&mut merge_map, &mut result_map];
258                        collect_plan_metrics(&physical_plan, &mut tmp);
259                        let re = result_map
260                            .into_iter()
261                            .map(|(k, v)| (k, Value::from(v)))
262                            .collect::<HashMap<String, Value>>();
263                        http_record_output.metrics.extend(re);
264                    }
265                    results.push(GreptimeQueryOutput::Records(http_record_output))
266                }
267                OutputData::RecordBatches(rbs) => {
268                    match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
269                        Ok(rows) => {
270                            results.push(GreptimeQueryOutput::Records(rows));
271                        }
272                        Err(err) => {
273                            return Err(ErrorResponse::from_error(err));
274                        }
275                    }
276                }
277            },
278
279            Err(err) => {
280                return Err(ErrorResponse::from_error(err));
281            }
282        }
283    }
284
285    let merge_map = merge_map
286        .into_iter()
287        .map(|(k, v)| (k, Value::from(v)))
288        .collect();
289
290    Ok((results, merge_map))
291}
292
293#[derive(Debug, Default, Serialize, Deserialize)]
294pub struct PromqlQuery {
295    pub query: String,
296    pub start: String,
297    pub end: String,
298    pub step: String,
299    pub lookback: Option<String>,
300    pub db: Option<String>,
301    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
302    // `arrow`],
303    // the default value is `greptimedb_v1`
304    pub format: Option<String>,
305    // For arrow output
306    pub compression: Option<String>,
307    // Returns epoch timestamps with the specified precision.
308    // Both u and µ indicate microseconds.
309    // epoch = [ns,u,µ,ms,s],
310    //
311    // For influx output only
312    //
313    // TODO(jeremy): currently, only InfluxDB result format is supported,
314    // and all columns of the `Timestamp` type will be converted to their
315    // specified time precision. Maybe greptimedb format can support this
316    // param too.
317    pub epoch: Option<String>,
318}
319
320impl From<PromqlQuery> for PromQuery {
321    fn from(query: PromqlQuery) -> Self {
322        PromQuery {
323            query: query.query,
324            start: query.start,
325            end: query.end,
326            step: query.step,
327            lookback: query
328                .lookback
329                .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
330        }
331    }
332}
333
334/// Handler to execute promql
335#[axum_macros::debug_handler]
336#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
337pub async fn promql(
338    State(state): State<ApiState>,
339    Query(params): Query<PromqlQuery>,
340    Extension(mut query_ctx): Extension<QueryContext>,
341) -> Response {
342    let sql_handler = &state.sql_handler;
343    let exec_start = Instant::now();
344    let db = query_ctx.get_db_string();
345
346    query_ctx.set_channel(Channel::Promql);
347    let query_ctx = Arc::new(query_ctx);
348
349    let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
350        .with_label_values(&[db.as_str()])
351        .start_timer();
352
353    let resp = if let Some((status, msg)) =
354        validate_schema(sql_handler.clone(), query_ctx.clone()).await
355    {
356        let resp = ErrorResponse::from_error_message(status, msg);
357        HttpResponse::Error(resp)
358    } else {
359        let format = params
360            .format
361            .as_ref()
362            .map(|s| s.to_lowercase())
363            .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
364            .unwrap_or(ResponseFormat::GreptimedbV1);
365        let epoch = params
366            .epoch
367            .as_ref()
368            .map(|s| s.to_lowercase())
369            .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
370        let compression = params.compression.clone();
371
372        let prom_query = params.into();
373        let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
374
375        match format {
376            ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
377            ResponseFormat::Csv(with_names, with_types) => {
378                CsvResponse::from_output(outputs, with_names, with_types).await
379            }
380            ResponseFormat::Table => TableResponse::from_output(outputs).await,
381            ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
382            ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
383            ResponseFormat::Json => JsonResponse::from_output(outputs).await,
384            ResponseFormat::Null => NullResponse::from_output(outputs).await,
385        }
386    };
387
388    resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
389        .into_response()
390}
391
392/// Handler to export metrics
393#[axum_macros::debug_handler]
394pub async fn metrics(
395    State(state): State<MetricsHandler>,
396    Query(_params): Query<HashMap<String, String>>,
397) -> String {
398    // A default ProcessCollector is registered automatically in prometheus.
399    // We do not need to explicitly collect process-related data.
400    // But ProcessCollector only support on linux.
401
402    #[cfg(not(windows))]
403    if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
404        if let Err(e) = c.update() {
405            common_telemetry::error!(e; "Failed to update jemalloc metrics");
406        }
407    }
408    state.render()
409}
410
411#[derive(Debug, Serialize, Deserialize)]
412pub struct HealthQuery {}
413
414#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
415pub struct HealthResponse {}
416
417/// Handler to export healthy check
418///
419/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
420#[axum_macros::debug_handler]
421pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
422    Json(HealthResponse {})
423}
424
425#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
426pub struct StatusResponse<'a> {
427    pub source_time: &'a str,
428    pub commit: &'a str,
429    pub branch: &'a str,
430    pub rustc_version: &'a str,
431    pub hostname: String,
432    pub version: &'a str,
433}
434
435/// Handler to expose information info about runtime, build, etc.
436#[axum_macros::debug_handler]
437pub async fn status() -> Json<StatusResponse<'static>> {
438    let hostname = hostname::get()
439        .map(|s| s.to_string_lossy().to_string())
440        .unwrap_or_else(|_| "unknown".to_string());
441    let build_info = common_version::build_info();
442    Json(StatusResponse {
443        source_time: build_info.source_time,
444        commit: build_info.commit,
445        branch: build_info.branch,
446        rustc_version: build_info.rustc,
447        hostname,
448        version: build_info.version,
449    })
450}
451
452/// Handler to expose configuration information info about runtime, build, etc.
453#[axum_macros::debug_handler]
454pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
455    (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
456}
457
458async fn validate_schema(
459    sql_handler: ServerSqlQueryHandlerRef,
460    query_ctx: QueryContextRef,
461) -> Option<(StatusCode, String)> {
462    match sql_handler
463        .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
464        .await
465    {
466        Ok(true) => None,
467        Ok(false) => Some((
468            StatusCode::DatabaseNotFound,
469            format!("Database not found: {}", query_ctx.get_db_string()),
470        )),
471        Err(e) => Some((
472            StatusCode::Internal,
473            format!(
474                "Error checking database: {}, {}",
475                query_ctx.get_db_string(),
476                e.output_msg(),
477            ),
478        )),
479    }
480}