Skip to main content

servers/http/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use axum::extract::rejection::FormRejection;
20use axum::extract::{Json, Query, State};
21use axum::response::sse::{Event, KeepAlive, Sse};
22use axum::response::{IntoResponse, Response};
23use axum::{Extension, Form};
24use common_catalog::parse_catalog_and_schema_from_db_string;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_plugins::GREPTIME_EXEC_WRITE_COST;
28use common_query::{Output, OutputData};
29use common_recordbatch::{RecordBatch, SendableRecordBatchStream, util};
30use common_telemetry::tracing;
31use datafusion::physical_plan::ExecutionPlan;
32use datatypes::schema::SchemaRef;
33use futures::StreamExt;
34use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery};
35use serde::{Deserialize, Serialize};
36use serde_json::Value;
37use session::context::{Channel, QueryContext, QueryContextRef};
38use snafu::ResultExt;
39use sql::dialect::GreptimeDbDialect;
40use sql::parser::{ParseOptions, ParserContext};
41use sql::statements::statement::Statement;
42
43use crate::error::{FailedToParseQuerySnafu, InvalidQuerySnafu, Result};
44use crate::http::header::collect_plan_metrics;
45use crate::http::result::arrow_result::ArrowResponse;
46use crate::http::result::csv_result::CsvResponse;
47use crate::http::result::error_result::ErrorResponse;
48use crate::http::result::greptime_result_v1::GreptimedbV1Response;
49use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
50use crate::http::result::json_result::JsonResponse;
51use crate::http::result::null_result::NullResponse;
52use crate::http::result::table_result::TableResponse;
53use crate::http::{
54    ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
55    HttpResponse, ResponseFormat,
56};
57use crate::metrics_handler::MetricsHandler;
58use crate::query_handler::sql::ServerSqlQueryHandlerRef;
59
60#[derive(Debug, Default, Serialize, Deserialize)]
61pub struct SqlQuery {
62    pub db: Option<String>,
63    pub sql: Option<String>,
64    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
65    // `arrow`],
66    // the default value is `greptimedb_v1`
67    pub format: Option<String>,
68    // Returns epoch timestamps with the specified precision.
69    // Both u and µ indicate microseconds.
70    // epoch = [ns,u,µ,ms,s],
71    //
72    // TODO(jeremy): currently, only InfluxDB result format is supported,
73    // and all columns of the `Timestamp` type will be converted to their
74    // specified time precision. Maybe greptimedb format can support this
75    // param too.
76    pub epoch: Option<String>,
77    pub limit: Option<usize>,
78    // For arrow output
79    pub compression: Option<String>,
80    pub snapshot_interval_ms: Option<u64>,
81}
82
83const DEFAULT_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 5000;
84const MIN_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 1000;
85const MAX_ANALYZE_SNAPSHOT_INTERVAL_MS: u64 = 60000;
86
87#[derive(Serialize)]
88struct AnalyzeStreamPayload {
89    seq: u64,
90    state: &'static str,
91    partial: bool,
92    elapsed_ms: u64,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    metrics: Option<Value>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    output: Option<GreptimeQueryOutput>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    reason: Option<String>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    code: Option<u32>,
101}
102
103struct AnalyzeStreamState {
104    stream: SendableRecordBatchStream,
105    schema: SchemaRef,
106    plan: Option<Arc<dyn ExecutionPlan>>,
107    batches: Vec<RecordBatch>,
108    seq: u64,
109    start: Instant,
110    requested_interval_ms: u64,
111    current_interval_ms: u64,
112    done: bool,
113}
114
115/// Handler to execute sql
116#[axum_macros::debug_handler]
117#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
118pub async fn sql(
119    State(state): State<ApiState>,
120    Query(query_params): Query<SqlQuery>,
121    Extension(mut query_ctx): Extension<QueryContext>,
122    Form(form_params): Form<SqlQuery>,
123) -> HttpResponse {
124    let start = Instant::now();
125    let sql_handler = &state.sql_handler;
126    if let Some(db) = &query_params.db.or(form_params.db) {
127        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
128        query_ctx.set_current_catalog(&catalog);
129        query_ctx.set_current_schema(&schema);
130    }
131    let db = query_ctx.get_db_string();
132
133    query_ctx.set_channel(Channel::HttpSql);
134    let query_ctx = Arc::new(query_ctx);
135
136    let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED
137        .with_label_values(&[db.as_str()])
138        .start_timer();
139
140    let sql = query_params.sql.or(form_params.sql);
141    let format = query_params
142        .format
143        .or(form_params.format)
144        .map(|s| s.to_lowercase())
145        .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
146        .unwrap_or(ResponseFormat::GreptimedbV1);
147    let epoch = query_params
148        .epoch
149        .or(form_params.epoch)
150        .map(|s| s.to_lowercase())
151        .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
152
153    let result = if let Some(sql) = &sql {
154        if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
155            Err((status, msg))
156        } else {
157            Ok(sql_handler.do_query(sql, query_ctx.clone()).await)
158        }
159    } else {
160        Err((
161            StatusCode::InvalidArguments,
162            "sql parameter is required.".to_string(),
163        ))
164    };
165
166    let outputs = match result {
167        Err((status, msg)) => {
168            return HttpResponse::Error(
169                ErrorResponse::from_error_message(status, msg)
170                    .with_execution_time(start.elapsed().as_millis() as u64),
171            );
172        }
173        Ok(outputs) => outputs,
174    };
175
176    let mut resp = match format {
177        ResponseFormat::Arrow => {
178            ArrowResponse::from_output(outputs, query_params.compression).await
179        }
180        ResponseFormat::Csv(with_names, with_types) => {
181            CsvResponse::from_output(outputs, with_names, with_types).await
182        }
183        ResponseFormat::Table => TableResponse::from_output(outputs).await,
184        ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
185        ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
186        ResponseFormat::Json => JsonResponse::from_output(outputs).await,
187        ResponseFormat::Null => NullResponse::from_output(outputs).await,
188    };
189
190    if let Some(limit) = query_params.limit {
191        resp = resp.with_limit(limit);
192    }
193    resp.with_execution_time(start.elapsed().as_millis() as u64)
194}
195
196/// Handler to stream partial `EXPLAIN ANALYZE VERBOSE` metrics as SSE.
197///
198/// This experimental endpoint is POST-only SSE, so browser `EventSource` does
199/// not apply. Each `metrics` event carries a complete snapshot (not a delta);
200/// large snapshots are throttled but never truncated. `final`, `canceled`, and
201/// `error` are terminal events. If the client disconnects it won't receive a
202/// `canceled` event, but the production frontend stream is dropped and
203/// best-effort cancels the underlying query.
204#[axum_macros::debug_handler]
205#[tracing::instrument(
206    skip_all,
207    fields(protocol = "http", request_type = "sql_analyze_stream")
208)]
209pub async fn sql_analyze_stream(
210    State(state): State<ApiState>,
211    Query(query_params): Query<SqlQuery>,
212    Extension(mut query_ctx): Extension<QueryContext>,
213    form_params: std::result::Result<Form<SqlQuery>, FormRejection>,
214) -> Response {
215    let start = Instant::now();
216    let form_params = match form_params {
217        Ok(Form(params)) => params,
218        Err(err) => {
219            if err.status() != axum::http::StatusCode::UNSUPPORTED_MEDIA_TYPE {
220                return ErrorResponse::from_error_message(
221                    StatusCode::InvalidArguments,
222                    err.body_text(),
223                )
224                .with_execution_time(start.elapsed().as_millis() as u64)
225                .into_response();
226            }
227            SqlQuery::default()
228        }
229    };
230    let sql_handler = &state.sql_handler;
231    if let Some(db) = &query_params.db.or(form_params.db) {
232        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
233        query_ctx.set_current_catalog(&catalog);
234        query_ctx.set_current_schema(&schema);
235    }
236    query_ctx.set_channel(Channel::HttpSql);
237    let query_ctx = Arc::new(query_ctx);
238
239    let Some(sql) = query_params.sql.or(form_params.sql) else {
240        return ErrorResponse::from_error_message(
241            StatusCode::InvalidArguments,
242            "sql parameter is required.".to_string(),
243        )
244        .with_execution_time(start.elapsed().as_millis() as u64)
245        .into_response();
246    };
247    if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await {
248        return ErrorResponse::from_error_message(status, msg)
249            .with_execution_time(start.elapsed().as_millis() as u64)
250            .into_response();
251    }
252
253    let interval_ms = query_params
254        .snapshot_interval_ms
255        .or(form_params.snapshot_interval_ms)
256        .unwrap_or(DEFAULT_ANALYZE_SNAPSHOT_INTERVAL_MS)
257        .clamp(
258            MIN_ANALYZE_SNAPSHOT_INTERVAL_MS,
259            MAX_ANALYZE_SNAPSHOT_INTERVAL_MS,
260        );
261
262    let output = match state
263        .sql_handler
264        .do_analyze_stream_query(&sql, query_ctx.clone())
265        .await
266    {
267        Ok(output) => output,
268        Err(err) => {
269            return ErrorResponse::from_error(err)
270                .with_execution_time(start.elapsed().as_millis() as u64)
271                .into_response();
272        }
273    };
274
275    let plan = output.meta.plan.clone();
276    let OutputData::Stream(stream) = output.data else {
277        return ErrorResponse::from_error_message(
278            StatusCode::InvalidArguments,
279            "analyze stream query must return a stream".to_string(),
280        )
281        .with_execution_time(start.elapsed().as_millis() as u64)
282        .into_response();
283    };
284    let schema = stream.schema();
285
286    let sse_stream = futures::stream::unfold(
287        AnalyzeStreamState {
288            stream,
289            schema,
290            plan,
291            batches: Vec::new(),
292            seq: 0,
293            start,
294            requested_interval_ms: interval_ms,
295            current_interval_ms: interval_ms,
296            done: false,
297        },
298        |mut state| async move {
299            if state.done {
300                return None;
301            }
302            let tick = tokio::time::sleep(Duration::from_millis(state.current_interval_ms));
303            tokio::pin!(tick);
304            loop {
305                tokio::select! {
306                    item = state.stream.next() => {
307                        match item {
308                            Some(Ok(batch)) => state.batches.push(batch),
309                            Some(Err(err)) => {
310                                let status = err.status_code();
311                                let event_name = if status == StatusCode::Cancelled { "canceled" } else { "error" };
312                                let (payload, _) = make_analyze_payload(AnalyzePayloadArgs {
313                                    seq: state.seq,
314                                    state: event_name,
315                                    partial: false,
316                                    start: state.start,
317                                    plan: state.plan.as_ref(),
318                                    output: None,
319                                    reason: Some(err.output_msg()),
320                                    code: Some(status as u32),
321                                });
322                                state.seq += 1;
323                                state.done = true;
324                                return Some((Ok::<Event, std::convert::Infallible>(Event::default().event(event_name).data(payload)), state));
325                            }
326                            None => {
327                                let batches = std::mem::take(&mut state.batches);
328                                let output = HttpRecordsOutput::try_new(state.schema.clone(), batches)
329                                    .map(GreptimeQueryOutput::Records);
330                                let (event_name, payload) = make_final_analyze_event(
331                                    output.map_err(|err| (err.output_msg(), err.status_code() as u32)),
332                                    state.seq,
333                                    state.start,
334                                    state.plan.as_ref(),
335                                );
336                                state.seq += 1;
337                                state.done = true;
338                                return Some((Ok::<Event, std::convert::Infallible>(Event::default().event(event_name).data(payload)), state));
339                            }
340                        }
341                    }
342                    _ = &mut tick => {
343                        if state.plan.is_some() {
344                            let (payload, payload_bytes) = make_analyze_payload(AnalyzePayloadArgs {
345                                seq: state.seq,
346                                state: "metrics",
347                                partial: true,
348                                start: state.start,
349                                plan: state.plan.as_ref(),
350                                output: None,
351                                reason: None,
352                                code: None,
353                            });
354                            state.current_interval_ms = adaptive_interval_ms(payload_bytes, state.requested_interval_ms);
355                            state.seq += 1;
356                            return Some((Ok::<Event, std::convert::Infallible>(Event::default().event("metrics").data(payload)), state));
357                        }
358                        tick.as_mut().reset(tokio::time::Instant::now() + Duration::from_millis(state.current_interval_ms));
359                    }
360                }
361            }
362        },
363    );
364
365    Sse::new(sse_stream)
366        .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
367        .into_response()
368}
369
370fn adaptive_interval_ms(payload_bytes: usize, requested_ms: u64) -> u64 {
371    if payload_bytes >= 10 * 1024 * 1024 {
372        requested_ms.max(30_000)
373    } else if payload_bytes >= 1024 * 1024 {
374        requested_ms.max(10_000)
375    } else {
376        requested_ms
377    }
378}
379
380fn make_final_analyze_event(
381    output: std::result::Result<GreptimeQueryOutput, (String, u32)>,
382    seq: u64,
383    start: Instant,
384    plan: Option<&Arc<dyn ExecutionPlan>>,
385) -> (&'static str, String) {
386    match output {
387        Ok(output) => (
388            "final",
389            make_analyze_payload(AnalyzePayloadArgs {
390                seq,
391                state: "final",
392                partial: false,
393                start,
394                plan,
395                output: Some(output),
396                reason: None,
397                code: None,
398            })
399            .0,
400        ),
401        Err((reason, code)) => (
402            "error",
403            make_analyze_payload(AnalyzePayloadArgs {
404                seq,
405                state: "error",
406                partial: false,
407                start,
408                plan,
409                output: None,
410                reason: Some(reason),
411                code: Some(code),
412            })
413            .0,
414        ),
415    }
416}
417
418struct AnalyzePayloadArgs<'a> {
419    seq: u64,
420    state: &'static str,
421    partial: bool,
422    start: Instant,
423    plan: Option<&'a Arc<dyn ExecutionPlan>>,
424    output: Option<GreptimeQueryOutput>,
425    reason: Option<String>,
426    code: Option<u32>,
427}
428
429fn make_analyze_payload(args: AnalyzePayloadArgs<'_>) -> (String, usize) {
430    let AnalyzePayloadArgs {
431        seq,
432        state,
433        partial,
434        start,
435        plan,
436        output,
437        reason,
438        code,
439    } = args;
440    let metrics = plan.and_then(|plan| query::analyze_plan_metrics_to_json_value(plan, true).ok());
441    let payload = AnalyzeStreamPayload {
442        seq,
443        state,
444        partial,
445        elapsed_ms: start.elapsed().as_millis() as u64,
446        metrics,
447        output,
448        reason,
449        code,
450    };
451    let payload_string = serde_json::to_string(&payload).unwrap_or_else(|e| {
452        serde_json::json!({
453            "seq": seq,
454            "state": "error",
455            "partial": false,
456            "reason": format!("Failed to serialize SSE payload: {e}"),
457        })
458        .to_string()
459    });
460    let payload_bytes = payload_string.len();
461    (payload_string, payload_bytes)
462}
463
464/// Handler to parse sql
465#[axum_macros::debug_handler]
466#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
467pub async fn sql_parse(
468    Query(query_params): Query<SqlQuery>,
469    Form(form_params): Form<SqlQuery>,
470) -> Result<Json<Vec<Statement>>> {
471    let Some(sql) = query_params.sql.or(form_params.sql) else {
472        return InvalidQuerySnafu {
473            reason: "sql parameter is required.",
474        }
475        .fail();
476    };
477
478    let stmts =
479        ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
480            .context(FailedToParseQuerySnafu)?;
481
482    Ok(stmts.into())
483}
484
485#[derive(Debug, Serialize, Deserialize)]
486pub struct SqlFormatResponse {
487    pub formatted: String,
488}
489
490/// Handler to format sql string
491#[axum_macros::debug_handler]
492#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql_format"))]
493pub async fn sql_format(
494    Query(query_params): Query<SqlQuery>,
495    Form(form_params): Form<SqlQuery>,
496) -> axum::response::Response {
497    let Some(sql) = query_params.sql.or(form_params.sql) else {
498        let resp = ErrorResponse::from_error_message(
499            StatusCode::InvalidArguments,
500            "sql parameter is required.".to_string(),
501        );
502        return HttpResponse::Error(resp).into_response();
503    };
504
505    // Parse using GreptimeDB dialect then reconstruct statements via Display
506    let stmts = match ParserContext::create_with_dialect(
507        &sql,
508        &GreptimeDbDialect {},
509        ParseOptions::default(),
510    ) {
511        Ok(v) => v,
512        Err(e) => return HttpResponse::Error(ErrorResponse::from_error(e)).into_response(),
513    };
514
515    let mut parts: Vec<String> = Vec::with_capacity(stmts.len());
516    for stmt in stmts {
517        let mut s = format!("{stmt}");
518        if !s.trim_end().ends_with(';') {
519            s.push(';');
520        }
521        parts.push(s);
522    }
523
524    let formatted = parts.join("\n");
525    Json(SqlFormatResponse { formatted }).into_response()
526}
527
528/// Create a response from query result
529pub async fn from_output(
530    outputs: Vec<crate::error::Result<Output>>,
531) -> std::result::Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
532    // TODO(sunng87): this api response structure cannot represent error well.
533    //  It hides successful execution results from error response
534    let mut results = Vec::with_capacity(outputs.len());
535    let mut merge_map = HashMap::new();
536
537    for out in outputs {
538        match out {
539            Ok(o) => match o.data {
540                OutputData::AffectedRows(rows) => {
541                    results.push(GreptimeQueryOutput::AffectedRows(rows));
542                    if o.meta.cost > 0 {
543                        merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
544                    }
545                }
546                OutputData::Stream(stream) => {
547                    let schema = stream.schema().clone();
548                    // TODO(sunng87): streaming response
549                    let mut http_record_output = match util::collect(stream).await {
550                        Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
551                            Ok(rows) => rows,
552                            Err(err) => {
553                                return Err(ErrorResponse::from_error(err));
554                            }
555                        },
556                        Err(err) => {
557                            return Err(ErrorResponse::from_error(err));
558                        }
559                    };
560                    if let Some(physical_plan) = o.meta.plan {
561                        let mut result_map = HashMap::new();
562
563                        let mut tmp = vec![&mut merge_map, &mut result_map];
564                        collect_plan_metrics(&physical_plan, &mut tmp);
565                        let re = result_map
566                            .into_iter()
567                            .map(|(k, v)| (k, Value::from(v)))
568                            .collect::<HashMap<String, Value>>();
569                        http_record_output.metrics.extend(re);
570                    }
571                    results.push(GreptimeQueryOutput::Records(http_record_output))
572                }
573                OutputData::RecordBatches(rbs) => {
574                    match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
575                        Ok(rows) => {
576                            results.push(GreptimeQueryOutput::Records(rows));
577                        }
578                        Err(err) => {
579                            return Err(ErrorResponse::from_error(err));
580                        }
581                    }
582                }
583            },
584
585            Err(err) => {
586                return Err(ErrorResponse::from_error(err));
587            }
588        }
589    }
590
591    let merge_map = merge_map
592        .into_iter()
593        .map(|(k, v)| (k, Value::from(v)))
594        .collect();
595
596    Ok((results, merge_map))
597}
598
599#[derive(Debug, Default, Serialize, Deserialize)]
600pub struct PromqlQuery {
601    pub query: String,
602    pub start: String,
603    pub end: String,
604    pub step: String,
605    pub lookback: Option<String>,
606    pub db: Option<String>,
607    // (Optional) result format: [`greptimedb_v1`, `influxdb_v1`, `csv`,
608    // `arrow`],
609    // the default value is `greptimedb_v1`
610    pub format: Option<String>,
611    // For arrow output
612    pub compression: Option<String>,
613    // Returns epoch timestamps with the specified precision.
614    // Both u and µ indicate microseconds.
615    // epoch = [ns,u,µ,ms,s],
616    //
617    // For influx output only
618    //
619    // TODO(jeremy): currently, only InfluxDB result format is supported,
620    // and all columns of the `Timestamp` type will be converted to their
621    // specified time precision. Maybe greptimedb format can support this
622    // param too.
623    pub epoch: Option<String>,
624}
625
626impl From<PromqlQuery> for PromQuery {
627    fn from(query: PromqlQuery) -> Self {
628        PromQuery {
629            query: query.query,
630            start: query.start,
631            end: query.end,
632            step: query.step,
633            lookback: query
634                .lookback
635                .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
636            // TODO(dennis): support alias from http params or parse from query.query
637            alias: None,
638        }
639    }
640}
641
642/// Handler to execute promql
643#[axum_macros::debug_handler]
644#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "promql"))]
645pub async fn promql(
646    State(state): State<ApiState>,
647    Query(params): Query<PromqlQuery>,
648    Extension(mut query_ctx): Extension<QueryContext>,
649) -> Response {
650    let sql_handler = &state.sql_handler;
651    let exec_start = Instant::now();
652    let db = query_ctx.get_db_string();
653
654    query_ctx.set_channel(Channel::Promql);
655    let query_ctx = Arc::new(query_ctx);
656
657    let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED
658        .with_label_values(&[db.as_str()])
659        .start_timer();
660
661    let resp = if let Some((status, msg)) =
662        validate_schema(sql_handler.clone(), query_ctx.clone()).await
663    {
664        let resp = ErrorResponse::from_error_message(status, msg);
665        HttpResponse::Error(resp)
666    } else {
667        let format = params
668            .format
669            .as_ref()
670            .map(|s| s.to_lowercase())
671            .map(|s| ResponseFormat::parse(s.as_str()).unwrap_or(ResponseFormat::GreptimedbV1))
672            .unwrap_or(ResponseFormat::GreptimedbV1);
673        let epoch = params
674            .epoch
675            .as_ref()
676            .map(|s| s.to_lowercase())
677            .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond));
678        let compression = params.compression.clone();
679
680        let prom_query = params.into();
681        let outputs = sql_handler.do_promql_query(&prom_query, query_ctx).await;
682
683        match format {
684            ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
685            ResponseFormat::Csv(with_names, with_types) => {
686                CsvResponse::from_output(outputs, with_names, with_types).await
687            }
688            ResponseFormat::Table => TableResponse::from_output(outputs).await,
689            ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
690            ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
691            ResponseFormat::Json => JsonResponse::from_output(outputs).await,
692            ResponseFormat::Null => NullResponse::from_output(outputs).await,
693        }
694    };
695
696    resp.with_execution_time(exec_start.elapsed().as_millis() as u64)
697        .into_response()
698}
699
700/// Handler to export metrics
701#[axum_macros::debug_handler]
702pub async fn metrics(
703    State(state): State<MetricsHandler>,
704    Query(_params): Query<HashMap<String, String>>,
705) -> String {
706    // A default ProcessCollector is registered automatically in prometheus.
707    // We do not need to explicitly collect process-related data.
708    // But ProcessCollector only support on linux.
709
710    #[cfg(not(windows))]
711    if let Some(c) = crate::metrics::jemalloc::JEMALLOC_COLLECTOR.as_ref()
712        && let Err(e) = c.update()
713    {
714        common_telemetry::error!(e; "Failed to update jemalloc metrics");
715    }
716    state.render()
717}
718
719#[derive(Debug, Serialize, Deserialize)]
720pub struct HealthQuery {}
721
722#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
723pub struct HealthResponse {}
724
725/// Handler to export healthy check
726///
727/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
728#[axum_macros::debug_handler]
729pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
730    Json(HealthResponse {})
731}
732
733#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
734pub struct StatusResponse<'a> {
735    pub commit: &'a str,
736    pub branch: &'a str,
737    pub rustc_version: &'a str,
738    pub hostname: String,
739    pub version: &'a str,
740}
741
742/// Handler to expose information info about runtime, build, etc.
743#[axum_macros::debug_handler]
744pub async fn status() -> Json<StatusResponse<'static>> {
745    let hostname = hostname::get()
746        .map(|s| s.to_string_lossy().to_string())
747        .unwrap_or_else(|_| "unknown".to_string());
748    let build_info = common_version::build_info();
749    Json(StatusResponse {
750        commit: build_info.commit,
751        branch: build_info.branch,
752        rustc_version: build_info.rustc,
753        hostname,
754        version: build_info.version,
755    })
756}
757
758/// Handler to expose configuration information info about runtime, build, etc.
759#[axum_macros::debug_handler]
760pub async fn config(State(state): State<GreptimeOptionsConfigState>) -> Response {
761    (axum::http::StatusCode::OK, state.greptime_config_options).into_response()
762}
763
764async fn validate_schema(
765    sql_handler: ServerSqlQueryHandlerRef,
766    query_ctx: QueryContextRef,
767) -> Option<(StatusCode, String)> {
768    match sql_handler
769        .is_valid_schema(query_ctx.current_catalog(), &query_ctx.current_schema())
770        .await
771    {
772        Ok(true) => None,
773        Ok(false) => Some((
774            StatusCode::DatabaseNotFound,
775            format!("Database not found: {}", query_ctx.get_db_string()),
776        )),
777        Err(e) => Some((
778            StatusCode::Internal,
779            format!(
780                "Error checking database: {}, {}",
781                query_ctx.get_db_string(),
782                e.output_msg(),
783            ),
784        )),
785    }
786}
787
788pub async fn index() -> axum::response::Html<String> {
789    let name = common_version::product_name();
790    let version = common_version::version();
791    axum::response::Html(format!(
792        r#"<!DOCTYPE html>
793<html>
794<head><title>{name}</title></head>
795<body>
796<h1>{name}</h1>
797<p>Version: {version}</p>
798<ul>
799<li><a href="/dashboard">Dashboard UI</a></li>
800<li><a href="/v1/health">Health</a> (JSON)</li>
801<li><a href="/status">Status</a> (JSON)</li>
802<li><a href="/metrics">Metrics</a> (For Prometheus Scrape)</li>
803<li><a href="/config">Config</a> (TXT)</li>
804</ul>
805</body>
806</html>"#,
807    ))
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    #[test]
815    fn test_final_analyze_event_uses_error_event_for_conversion_error() {
816        let (event_name, payload) = make_final_analyze_event(
817            Err((
818                "conversion failed".to_string(),
819                StatusCode::InvalidArguments as u32,
820            )),
821            7,
822            Instant::now(),
823            None,
824        );
825
826        assert_eq!(event_name, "error");
827        let value: Value = serde_json::from_str(&payload).unwrap();
828        assert_eq!(value["state"], "error");
829        assert_eq!(value["reason"], "conversion failed");
830    }
831}