servers/http/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::table_result::TableResponse;
47use crate::http::{
48    ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
49    HttpResponse, ResponseFormat,
50};
51use crate::metrics_handler::MetricsHandler;
52use crate::query_handler::sql::ServerSqlQueryHandlerRef;
53
54#[derive(Debug, Default, Serialize, Deserialize)]
55pub struct SqlQuery {
56    pub db: Option<String>,
57    pub sql: Option<String>,
58    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
59    // `arrow`],
60    // the default value is `greptimedb_v1`
61    pub format: Option<String>,
62    // Returns epoch timestamps with the specified precision.
63    // Both u and µ indicate microseconds.
64    // epoch = [ns,u,µ,ms,s],
65    //
66    // TODO(jeremy): currently, only InfluxDB result format is supported,
67    // and all columns of the `Timestamp` type will be converted to their
68    // specified time precision. Maybe greptimedb format can support this
69    // param too.
70    pub epoch: Option<String>,
71    pub limit: Option<usize>,
72    // For arrow output
73    pub compression: Option<String>,
74}
75
76/// Handler to execute sql
77#[axum_macros::debug_handler]
78#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
79pub async fn sql(
80    State(state): State<ApiState>,
81    Query(query_params): Query<SqlQuery>,
82    Extension(mut query_ctx): Extension<QueryContext>,
83    Form(form_params): Form<SqlQuery>,
84) -> HttpResponse {
85    let start = Instant::now();
86    let sql_handler = &state.sql_handler;
87    if let Some(db) = &query_params.db.or(form_params.db) {
88        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
89        query_ctx.set_current_catalog(&catalog);
90        query_ctx.set_current_schema(&schema);
91    }
92    let db = query_ctx.get_db_string();
93
94    query_ctx.set_channel(Channel::Http);
95    let query_ctx = Arc::new(query_ctx);
96
97    let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
98        .with_label_values(&[db.as_str()])
99        .start_timer();
100
101    let sql = query_params.sql.or(form_params.sql);
102    let format = query_params
103        .format
104        .or(form_params.format)
105        .map(|s| s.to_lowercase())
106        .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
107        .unwrap_or(ResponseFormat::GreptimedbV1);
108    let epoch = query_params
109        .epoch
110        .or(form_params.epoch)
111        .map(|s| s.to_lowercase())
112        .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
113
114    let result = if let Some(sql) = &sql {
115        if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
116            Err((status, msg))
117        } else {
118            Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
119        }
120    } else {
121        Err((
122            StatusCode::InvalidArguments,
123            "sql parameter is required.".to_string(),
124        ))
125    };
126
127    let outputs = match result {
128        Err((status, msg)) => {
129            return HttpResponse::Error(
130                ErrorResponse::from_error_message(status, msg)
131                    .with_execution_time(start.elapsed().as_millis() as u64),
132            );
133        }
134        Ok(outputs) => outputs,
135    };
136
137    let mut resp = match format {
138        ResponseFormat::Arrow => {
139            ArrowResponse::from_output(outputs, query_params.compression).await
140        }
141        ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
142        ResponseFormat::Table => TableResponse::from_output(outputs).await,
143        ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
144        ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
145        ResponseFormat::Json => JsonResponse::from_output(outputs).await,
146    };
147
148    if let Some(limit) = query_params.limit {
149        resp = resp.with_limit(limit);
150    }
151    resp.with_execution_time(start.elapsed().as_millis() as u64)
152}
153
154/// Handler to parse sql
155#[axum_macros::debug_handler]
156#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
157pub async fn sql_parse(
158    Query(query_params): Query<SqlQuery>,
159    Form(form_params): Form<SqlQuery>,
160) -> Result<Json<Vec<Statement>>> {
161    let Some(sql) = query_params.sql.or(form_params.sql) else {
162        return InvalidQuerySnafu {
163            reason: "sql parameter is required.",
164        }
165        .fail();
166    };
167
168    let stmts =
169        ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
170            .context(FailedToParseQuerySnafu)?;
171
172    Ok(stmts.into())
173}
174
175/// Create a response from query result
176pub async fn from_output(
177    outputs: Vec<crate::error::Result<Output>>,
178) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
179    // TODO(sunng87): this api response structure cannot represent error well.
180    //  It hides successful execution results from error response
181    let mut results = Vec::with_capacity(outputs.len());
182    let mut merge_map = HashMap::new();
183
184    for out in outputs {
185        match out {
186            Ok(o) => match o.data {
187                OutputData::AffectedRows(rows) => {
188                    results.push(GreptimeQueryOutput::AffectedRows(rows));
189                    if o.meta.cost > 0 {
190                        merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
191                    }
192                }
193                OutputData::Stream(stream) => {
194                    let schema = stream.schema().clone();
195                    // TODO(sunng87): streaming response
196                    let mut http_record_output = match util::collect(stream).await {
197                        Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
198                            Ok(rows) => rows,
199                            Err(err) => {
200                                return Err(ErrorResponse::from_error(err));
201                            }
202                        },
203                        Err(err) => {
204                            return Err(ErrorResponse::from_error(err));
205                        }
206                    };
207                    if let Some(physical_plan) = o.meta.plan {
208                        let mut result_map = HashMap::new();
209
210                        let mut tmp = vec![&mut merge_map, &mut result_map];
211                        collect_plan_metrics(&physical_plan, &mut tmp);
212                        let re = result_map
213                            .into_iter()
214                            .map(|(k, v)| (k, Value::from(v)))
215                            .collect::<HashMap<String, Value>>();
216                        http_record_output.metrics.extend(re);
217                    }
218                    results.push(GreptimeQueryOutput::Records(http_record_output))
219                }
220                OutputData::RecordBatches(rbs) => {
221                    match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
222                        Ok(rows) => {
223                            results.push(GreptimeQueryOutput::Records(rows));
224                        }
225                        Err(err) => {
226                            return Err(ErrorResponse::from_error(err));
227                        }
228                    }
229                }
230            },
231
232            Err(err) => {
233                return Err(ErrorResponse::from_error(err));
234            }
235        }
236    }
237
238    let merge_map = merge_map
239        .into_iter()
240        .map(|(k, v)| (k, Value::from(v)))
241        .collect();
242
243    Ok((results, merge_map))
244}
245
246#[derive(Debug, Default, Serialize, Deserialize)]
247pub struct PromqlQuery {
248    pub query: String,
249    pub start: String,
250    pub end: String,
251    pub step: String,
252    pub lookback: Option<String>,
253    pub db: Option<String>,
254    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
255    // `arrow`],
256    // the default value is `greptimedb_v1`
257    pub format: Option<String>,
258    // For arrow output
259    pub compression: Option<String>,
260    // Returns epoch timestamps with the specified precision.
261    // Both u and µ indicate microseconds.
262    // epoch = [ns,u,µ,ms,s],
263    //
264    // For influx output only
265    //
266    // TODO(jeremy): currently, only InfluxDB result format is supported,
267    // and all columns of the `Timestamp` type will be converted to their
268    // specified time precision. Maybe greptimedb format can support this
269    // param too.
270    pub epoch: Option<String>,
271}
272
273impl From<PromqlQuery> for PromQuery {
274    fn from(query: PromqlQuery) -> Self {
275        PromQuery {
276            query: query.query,
277            start: query.start,
278            end: query.end,
279            step: query.step,
280            lookback: query
281                .lookback
282                .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
283        }
284    }
285}
286
287/// Handler to execute promql
288#[axum_macros::debug_handler]
289#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
290pub async fn promql(
291    State(state): State<ApiState>,
292    Query(params): Query<PromqlQuery>,
293    Extension(mut query_ctx): Extension<QueryContext>,
294) -> Response {
295    let sql_handler = &state.sql_handler;
296    let exec_start = Instant::now();
297    let db = query_ctx.get_db_string();
298
299    query_ctx.set_channel(Channel::Http);
300    let query_ctx = Arc::new(query_ctx);
301
302    let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
303        .with_label_values(&[db.as_str()])
304        .start_timer();
305
306    let resp = if let Some((status, msg)) =
307        validate_schema(sql_handler.clone(), query_ctx.clone()).await
308    {
309        let resp = ErrorResponse::from_error_message(status, msg);
310        HttpResponse::Error(resp)
311    } else {
312        let format = params
313            .format
314            .as_ref()
315            .map(|s| s.to_lowercase())
316            .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
317            .unwrap_or(ResponseFormat::GreptimedbV1);
318        let epoch = params
319            .epoch
320            .as_ref()
321            .map(|s| s.to_lowercase())
322            .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
323        let compression = params.compression.clone();
324
325        let prom_query = params.into();
326        let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
327
328        match format {
329            ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
330            ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
331            ResponseFormat::Table => TableResponse::from_output(outputs).await,
332            ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
333            ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
334            ResponseFormat::Json => JsonResponse::from_output(outputs).await,
335        }
336    };
337
338    resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
339        .into_response()
340}
341
342/// Handler to export metrics
343#[axum_macros::debug_handler]
344pub async fn metrics(
345    State(state): State<MetricsHandler>,
346    Query(_params): Query<HashMap<String, String>>,
347) -> String {
348    // A default ProcessCollector is registered automatically in prometheus.
349    // We do not need to explicitly collect process-related data.
350    // But ProcessCollector only support on linux.
351
352    #[cfg(not(windows))]
353    if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
354        if let Err(e) = c.update() {
355            common_telemetry::error!(e; "Failed to update jemalloc metrics");
356        }
357    }
358    state.render()
359}
360
361#[derive(Debug, Serialize, Deserialize)]
362pub struct HealthQuery {}
363
364#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
365pub struct HealthResponse {}
366
367/// Handler to export healthy check
368///
369/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
370#[axum_macros::debug_handler]
371pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
372    Json(HealthResponse {})
373}
374
375#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
376pub struct StatusResponse<'a> {
377    pub source_time: &'a str,
378    pub commit: &'a str,
379    pub branch: &'a str,
380    pub rustc_version: &'a str,
381    pub hostname: String,
382    pub version: &'a str,
383}
384
385/// Handler to expose information info about runtime, build, etc.
386#[axum_macros::debug_handler]
387pub async fn status() -> Json<StatusResponse<'static>> {
388    let hostname = hostname::get()
389        .map(|s| s.to_string_lossy().to_string())
390        .unwrap_or_else(|_| "unknown".to_string());
391    let build_info = common_version::build_info();
392    Json(StatusResponse {
393        source_time: build_info.source_time,
394        commit: build_info.commit,
395        branch: build_info.branch,
396        rustc_version: build_info.rustc,
397        hostname,
398        version: build_info.version,
399    })
400}
401
402/// Handler to expose configuration information info about runtime, build, etc.
403#[axum_macros::debug_handler]
404pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
405    (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
406}
407
408async fn validate_schema(
409    sql_handler: ServerSqlQueryHandlerRef,
410    query_ctx: QueryContextRef,
411) -> Option<(StatusCode, String)> {
412    match sql_handler
413        .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
414        .await
415    {
416        Ok(true) => None,
417        Ok(false) => Some((
418            StatusCode::DatabaseNotFound,
419            format!("Database not found: {}", query_ctx.get_db_string()),
420        )),
421        Err(e) => Some((
422            StatusCode::Internal,
423            format!(
424                "Error checking database: {}, {}",
425                query_ctx.get_db_string(),
426                e.output_msg(),
427            ),
428        )),
429    }
430}