servers/http/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Json, Query, State};
20use axum::response::{IntoResponse, Response};
21use axum::{Extension, Form};
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_plugins::GREPTIME_EXEC_WRITE_COST;
26use common_query::{Output, OutputData};
27use common_recordbatch::util;
28use common_telemetry::tracing;
29use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use session::context::{Channel, QueryContext, QueryContextRef};
33use snafu::ResultExt;
34use sql::dialect::GreptimeDbDialect;
35use sql::parser::{ParseOptions, ParserContext};
36use sql::statements::statement::Statement;
37
38use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
39use crate::http::header::collect_plan_metrics;
40use crate::http::result::arrow_result::ArrowResponse;
41use crate::http::result::csv_result::CsvResponse;
42use crate::http::result::error_result::ErrorResponse;
43use crate::http::result::greptime_result_v1::GreptimedbV1Response;
44use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
45use crate::http::result::json_result::JsonResponse;
46use crate::http::result::table_result::TableResponse;
47use crate::http::{
48    ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
49    HttpResponse, ResponseFormat,
50};
51use crate::metrics_handler::MetricsHandler;
52use crate::query_handler::sql::ServerSqlQueryHandlerRef;
53
54#[derive(Debug, Default, Serialize, Deserialize)]
55pub struct SqlQuery {
56    pub db: Option<String>,
57    pub sql: Option<String>,
58    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
59    // `arrow`],
60    // the default value is `greptimedb_v1`
61    pub format: Option<String>,
62    // Returns epoch timestamps with the specified precision.
63    // Both u and µ indicate microseconds.
64    // epoch = [ns,u,µ,ms,s],
65    //
66    // TODO(jeremy): currently, only InfluxDB result format is supported,
67    // and all columns of the `Timestamp` type will be converted to their
68    // specified time precision. Maybe greptimedb format can support this
69    // param too.
70    pub epoch: Option<String>,
71    pub limit: Option<usize>,
72    // For arrow output
73    pub compression: Option<String>,
74}
75
76/// Handler to execute sql
77#[axum_macros::debug_handler]
78#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
79pub async fn sql(
80    State(state): State<ApiState>,
81    Query(query_params): Query<SqlQuery>,
82    Extension(mut query_ctx): Extension<QueryContext>,
83    Form(form_params): Form<SqlQuery>,
84) -> HttpResponse {
85    let start = Instant::now();
86    let sql_handler = &state.sql_handler;
87    if let Some(db) = &query_params.db.or(form_params.db) {
88        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
89        query_ctx.set_current_catalog(&catalog);
90        query_ctx.set_current_schema(&schema);
91    }
92    let db = query_ctx.get_db_string();
93
94    query_ctx.set_channel(Channel::Http);
95    let query_ctx = Arc::new(query_ctx);
96
97    let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
98        .with_label_values(&[db.as_str()])
99        .start_timer();
100
101    let sql = query_params.sql.or(form_params.sql);
102    let format = query_params
103        .format
104        .or(form_params.format)
105        .map(|s| s.to_lowercase())
106        .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
107        .unwrap_or(ResponseFormat::GreptimedbV1);
108    let epoch = query_params
109        .epoch
110        .or(form_params.epoch)
111        .map(|s| s.to_lowercase())
112        .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
113
114    let result = if let Some(sql) = &sql {
115        if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
116            Err((status, msg))
117        } else {
118            Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
119        }
120    } else {
121        Err((
122            StatusCode::InvalidArguments,
123            "sql parameter is required.".to_string(),
124        ))
125    };
126
127    let outputs = match result {
128        Err((status, msg)) => {
129            return HttpResponse::Error(
130                ErrorResponse::from_error_message(status, msg)
131                    .with_execution_time(start.elapsed().as_millis() as u64),
132            );
133        }
134        Ok(outputs) => outputs,
135    };
136
137    let mut resp = match format {
138        ResponseFormat::Arrow => {
139            ArrowResponse::from_output(outputs, query_params.compression).await
140        }
141        ResponseFormat::Csv(with_names, with_types) => {
142            CsvResponse::from_output(outputs, with_names, with_types).await
143        }
144        ResponseFormat::Table => TableResponse::from_output(outputs).await,
145        ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
146        ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
147        ResponseFormat::Json => JsonResponse::from_output(outputs).await,
148    };
149
150    if let Some(limit) = query_params.limit {
151        resp = resp.with_limit(limit);
152    }
153    resp.with_execution_time(start.elapsed().as_millis() as u64)
154}
155
156/// Handler to parse sql
157#[axum_macros::debug_handler]
158#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
159pub async fn sql_parse(
160    Query(query_params): Query<SqlQuery>,
161    Form(form_params): Form<SqlQuery>,
162) -> Result<Json<Vec<Statement>>> {
163    let Some(sql) = query_params.sql.or(form_params.sql) else {
164        return InvalidQuerySnafu {
165            reason: "sql parameter is required.",
166        }
167        .fail();
168    };
169
170    let stmts =
171        ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
172            .context(FailedToParseQuerySnafu)?;
173
174    Ok(stmts.into())
175}
176
177/// Create a response from query result
178pub async fn from_output(
179    outputs: Vec<crate::error::Result<Output>>,
180) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
181    // TODO(sunng87): this api response structure cannot represent error well.
182    //  It hides successful execution results from error response
183    let mut results = Vec::with_capacity(outputs.len());
184    let mut merge_map = HashMap::new();
185
186    for out in outputs {
187        match out {
188            Ok(o) => match o.data {
189                OutputData::AffectedRows(rows) => {
190                    results.push(GreptimeQueryOutput::AffectedRows(rows));
191                    if o.meta.cost > 0 {
192                        merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
193                    }
194                }
195                OutputData::Stream(stream) => {
196                    let schema = stream.schema().clone();
197                    // TODO(sunng87): streaming response
198                    let mut http_record_output = match util::collect(stream).await {
199                        Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
200                            Ok(rows) => rows,
201                            Err(err) => {
202                                return Err(ErrorResponse::from_error(err));
203                            }
204                        },
205                        Err(err) => {
206                            return Err(ErrorResponse::from_error(err));
207                        }
208                    };
209                    if let Some(physical_plan) = o.meta.plan {
210                        let mut result_map = HashMap::new();
211
212                        let mut tmp = vec![&mut merge_map, &mut result_map];
213                        collect_plan_metrics(&physical_plan, &mut tmp);
214                        let re = result_map
215                            .into_iter()
216                            .map(|(k, v)| (k, Value::from(v)))
217                            .collect::<HashMap<String, Value>>();
218                        http_record_output.metrics.extend(re);
219                    }
220                    results.push(GreptimeQueryOutput::Records(http_record_output))
221                }
222                OutputData::RecordBatches(rbs) => {
223                    match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
224                        Ok(rows) => {
225                            results.push(GreptimeQueryOutput::Records(rows));
226                        }
227                        Err(err) => {
228                            return Err(ErrorResponse::from_error(err));
229                        }
230                    }
231                }
232            },
233
234            Err(err) => {
235                return Err(ErrorResponse::from_error(err));
236            }
237        }
238    }
239
240    let merge_map = merge_map
241        .into_iter()
242        .map(|(k, v)| (k, Value::from(v)))
243        .collect();
244
245    Ok((results, merge_map))
246}
247
248#[derive(Debug, Default, Serialize, Deserialize)]
249pub struct PromqlQuery {
250    pub query: String,
251    pub start: String,
252    pub end: String,
253    pub step: String,
254    pub lookback: Option<String>,
255    pub db: Option<String>,
256    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
257    // `arrow`],
258    // the default value is `greptimedb_v1`
259    pub format: Option<String>,
260    // For arrow output
261    pub compression: Option<String>,
262    // Returns epoch timestamps with the specified precision.
263    // Both u and µ indicate microseconds.
264    // epoch = [ns,u,µ,ms,s],
265    //
266    // For influx output only
267    //
268    // TODO(jeremy): currently, only InfluxDB result format is supported,
269    // and all columns of the `Timestamp` type will be converted to their
270    // specified time precision. Maybe greptimedb format can support this
271    // param too.
272    pub epoch: Option<String>,
273}
274
275impl From<PromqlQuery> for PromQuery {
276    fn from(query: PromqlQuery) -> Self {
277        PromQuery {
278            query: query.query,
279            start: query.start,
280            end: query.end,
281            step: query.step,
282            lookback: query
283                .lookback
284                .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
285        }
286    }
287}
288
289/// Handler to execute promql
290#[axum_macros::debug_handler]
291#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
292pub async fn promql(
293    State(state): State<ApiState>,
294    Query(params): Query<PromqlQuery>,
295    Extension(mut query_ctx): Extension<QueryContext>,
296) -> Response {
297    let sql_handler = &state.sql_handler;
298    let exec_start = Instant::now();
299    let db = query_ctx.get_db_string();
300
301    query_ctx.set_channel(Channel::Http);
302    let query_ctx = Arc::new(query_ctx);
303
304    let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
305        .with_label_values(&[db.as_str()])
306        .start_timer();
307
308    let resp = if let Some((status, msg)) =
309        validate_schema(sql_handler.clone(), query_ctx.clone()).await
310    {
311        let resp = ErrorResponse::from_error_message(status, msg);
312        HttpResponse::Error(resp)
313    } else {
314        let format = params
315            .format
316            .as_ref()
317            .map(|s| s.to_lowercase())
318            .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
319            .unwrap_or(ResponseFormat::GreptimedbV1);
320        let epoch = params
321            .epoch
322            .as_ref()
323            .map(|s| s.to_lowercase())
324            .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
325        let compression = params.compression.clone();
326
327        let prom_query = params.into();
328        let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
329
330        match format {
331            ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
332            ResponseFormat::Csv(with_names, with_types) => {
333                CsvResponse::from_output(outputs, with_names, with_types).await
334            }
335            ResponseFormat::Table => TableResponse::from_output(outputs).await,
336            ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
337            ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
338            ResponseFormat::Json => JsonResponse::from_output(outputs).await,
339        }
340    };
341
342    resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
343        .into_response()
344}
345
346/// Handler to export metrics
347#[axum_macros::debug_handler]
348pub async fn metrics(
349    State(state): State<MetricsHandler>,
350    Query(_params): Query<HashMap<String, String>>,
351) -> String {
352    // A default ProcessCollector is registered automatically in prometheus.
353    // We do not need to explicitly collect process-related data.
354    // But ProcessCollector only support on linux.
355
356    #[cfg(not(windows))]
357    if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref() {
358        if let Err(e) = c.update() {
359            common_telemetry::error!(e; "Failed to update jemalloc metrics");
360        }
361    }
362    state.render()
363}
364
365#[derive(Debug, Serialize, Deserialize)]
366pub struct HealthQuery {}
367
368#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
369pub struct HealthResponse {}
370
371/// Handler to export healthy check
372///
373/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
374#[axum_macros::debug_handler]
375pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
376    Json(HealthResponse {})
377}
378
379#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
380pub struct StatusResponse<'a> {
381    pub source_time: &'a str,
382    pub commit: &'a str,
383    pub branch: &'a str,
384    pub rustc_version: &'a str,
385    pub hostname: String,
386    pub version: &'a str,
387}
388
389/// Handler to expose information info about runtime, build, etc.
390#[axum_macros::debug_handler]
391pub async fn status() -> Json<StatusResponse<'static>> {
392    let hostname = hostname::get()
393        .map(|s| s.to_string_lossy().to_string())
394        .unwrap_or_else(|_| "unknown".to_string());
395    let build_info = common_version::build_info();
396    Json(StatusResponse {
397        source_time: build_info.source_time,
398        commit: build_info.commit,
399        branch: build_info.branch,
400        rustc_version: build_info.rustc,
401        hostname,
402        version: build_info.version,
403    })
404}
405
406/// Handler to expose configuration information info about runtime, build, etc.
407#[axum_macros::debug_handler]
408pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
409    (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
410}
411
412async fn validate_schema(
413    sql_handler: ServerSqlQueryHandlerRef,
414    query_ctx: QueryContextRef,
415) -> Option<(StatusCode, String)> {
416    match sql_handler
417        .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
418        .await
419    {
420        Ok(true) => None,
421        Ok(false) => Some((
422            StatusCode::DatabaseNotFound,
423            format!("Database not found: {}", query_ctx.get_db_string()),
424        )),
425        Err(e) => Some((
426            StatusCode::Internal,
427            format!(
428                "Error checking database: {}, {}",
429                query_ctx.get_db_string(),
430                e.output_msg(),
431            ),
432        )),
433    }
434}