servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::StreamExt;
34use futures::future::join_all;
35use itertools::Itertools;
36use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
37use promql_parser::parser::token::{self};
38use promql_parser::parser::value::ValueType;
39use promql_parser::parser::{
40    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
41    SubqueryExpr, UnaryExpr, VectorSelector,
42};
43use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
44use query::promql::planner::normalize_matcher;
45use serde::de::{self, MapAccess, Visitor};
46use serde::{Deserialize, Serialize};
47use serde_json::Value;
48use session::context::{QueryContext, QueryContextRef};
49use snafu::{Location, OptionExt, ResultExt};
50use store_api::metric_engine_consts::{
51    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
52};
53
54pub use super::result::prometheus_resp::PrometheusJsonResponse;
55use crate::error::{
56    CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
57    TableNotFoundSnafu, UnexpectedResultSnafu,
58};
59use crate::http::header::collect_plan_metrics;
60use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
61use crate::prometheus_handler::PrometheusHandlerRef;
62
63/// For [ValueType::Vector] result type
64#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
65pub struct PromSeriesVector {
66    pub metric: BTreeMap<String, String>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub value: Option<(f64, String)>,
69}
70
71/// For [ValueType::Matrix] result type
72#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
73pub struct PromSeriesMatrix {
74    pub metric: BTreeMap<String, String>,
75    pub values: Vec<(f64, String)>,
76}
77
78/// Variants corresponding to [ValueType]
79#[derive(Debug, Serialize, Deserialize, PartialEq)]
80#[serde(untagged)]
81pub enum PromQueryResult {
82    Matrix(Vec<PromSeriesMatrix>),
83    Vector(Vec<PromSeriesVector>),
84    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
86}
87
88impl Default for PromQueryResult {
89    fn default() -> Self {
90        PromQueryResult::Matrix(Default::default())
91    }
92}
93
94#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromData {
96    #[serde(rename = "resultType")]
97    pub result_type: String,
98    pub result: PromQueryResult,
99}
100
101#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PrometheusResponse {
104    PromData(PromData),
105    Labels(Vec<String>),
106    Series(Vec<HashMap<String, String>>),
107    LabelValues(Vec<String>),
108    FormatQuery(String),
109    BuildInfo(OwnedBuildInfo),
110    #[serde(skip_deserializing)]
111    ParseResult(promql_parser::parser::Expr),
112    #[default]
113    None,
114}
115
116impl PrometheusResponse {
117    /// Append the other [`PrometheusResponse]`.
118    /// # NOTE
119    ///   Only append matrix and vector results, otherwise just ignore the other response.
120    fn append(&mut self, other: PrometheusResponse) {
121        match (self, other) {
122            (
123                PrometheusResponse::PromData(PromData {
124                    result: PromQueryResult::Matrix(lhs),
125                    ..
126                }),
127                PrometheusResponse::PromData(PromData {
128                    result: PromQueryResult::Matrix(rhs),
129                    ..
130                }),
131            ) => {
132                lhs.extend(rhs);
133            }
134
135            (
136                PrometheusResponse::PromData(PromData {
137                    result: PromQueryResult::Vector(lhs),
138                    ..
139                }),
140                PrometheusResponse::PromData(PromData {
141                    result: PromQueryResult::Vector(rhs),
142                    ..
143                }),
144            ) => {
145                lhs.extend(rhs);
146            }
147            _ => {
148                // TODO(dennis): process other cases?
149            }
150        }
151    }
152
153    pub fn is_none(&self) -> bool {
154        matches!(self, PrometheusResponse::None)
155    }
156}
157
158#[derive(Debug, Default, Serialize, Deserialize)]
159pub struct FormatQuery {
160    query: Option<String>,
161}
162
163#[axum_macros::debug_handler]
164#[tracing::instrument(
165    skip_all,
166    fields(protocol = "prometheus", request_type = "format_query")
167)]
168pub async fn format_query(
169    State(_handler): State<PrometheusHandlerRef>,
170    Query(params): Query<InstantQuery>,
171    Extension(_query_ctx): Extension<QueryContext>,
172    Form(form_params): Form<InstantQuery>,
173) -> PrometheusJsonResponse {
174    let query = params.query.or(form_params.query).unwrap_or_default();
175    match promql_parser::parser::parse(&query) {
176        Ok(expr) => {
177            let pretty = expr.prettify();
178            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
179        }
180        Err(reason) => {
181            let err = InvalidQuerySnafu { reason }.build();
182            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
183        }
184    }
185}
186
187#[derive(Debug, Default, Serialize, Deserialize)]
188pub struct BuildInfoQuery {}
189
190#[axum_macros::debug_handler]
191#[tracing::instrument(
192    skip_all,
193    fields(protocol = "prometheus", request_type = "build_info_query")
194)]
195pub async fn build_info_query() -> PrometheusJsonResponse {
196    let build_info = common_version::build_info().clone();
197    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
198}
199
200#[derive(Debug, Default, Serialize, Deserialize)]
201pub struct InstantQuery {
202    query: Option<String>,
203    lookback: Option<String>,
204    time: Option<String>,
205    timeout: Option<String>,
206    db: Option<String>,
207}
208
209/// Helper macro which try to evaluate the expression and return its results.
210/// If the evaluation fails, return a `PrometheusJsonResponse` early.
211macro_rules! try_call_return_response {
212    ($handle: expr) => {
213        match $handle {
214            Ok(res) => res,
215            Err(err) => {
216                let msg = err.to_string();
217                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
218            }
219        }
220    };
221}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225    skip_all,
226    fields(protocol = "prometheus", request_type = "instant_query")
227)]
228pub async fn instant_query(
229    State(handler): State<PrometheusHandlerRef>,
230    Query(params): Query<InstantQuery>,
231    Extension(mut query_ctx): Extension<QueryContext>,
232    Form(form_params): Form<InstantQuery>,
233) -> PrometheusJsonResponse {
234    // Extract time from query string, or use current server time if not specified.
235    let time = params
236        .time
237        .or(form_params.time)
238        .unwrap_or_else(current_time_rfc3339);
239    let prom_query = PromQuery {
240        query: params.query.or(form_params.query).unwrap_or_default(),
241        start: time.clone(),
242        end: time,
243        step: "1s".to_string(),
244        lookback: params
245            .lookback
246            .or(form_params.lookback)
247            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
248    };
249
250    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
251
252    // update catalog and schema in query context if necessary
253    if let Some(db) = &params.db {
254        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
255        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
256    }
257    let query_ctx = Arc::new(query_ctx);
258
259    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
260        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
261        .start_timer();
262
263    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
264        && !name_matchers.is_empty()
265    {
266        debug!("Find metric name matchers: {:?}", name_matchers);
267
268        let metric_names =
269            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
270
271        debug!("Find metric names: {:?}", metric_names);
272
273        if metric_names.is_empty() {
274            let result_type = promql_expr.value_type();
275
276            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
277                result_type: result_type.to_string(),
278                ..Default::default()
279            }));
280        }
281
282        let responses = join_all(metric_names.into_iter().map(|metric| {
283            let mut prom_query = prom_query.clone();
284            let mut promql_expr = promql_expr.clone();
285            let query_ctx = query_ctx.clone();
286            let handler = handler.clone();
287
288            async move {
289                update_metric_name_matcher(&mut promql_expr, &metric);
290                let new_query = promql_expr.to_string();
291                debug!(
292                    "Updated promql, before: {}, after: {}",
293                    &prom_query.query, new_query
294                );
295                prom_query.query = new_query;
296
297                do_instant_query(&handler, &prom_query, query_ctx).await
298            }
299        }))
300        .await;
301
302        responses
303            .into_iter()
304            .reduce(|mut acc, resp| {
305                acc.data.append(resp.data);
306                acc
307            })
308            .unwrap()
309    } else {
310        do_instant_query(&handler, &prom_query, query_ctx).await
311    }
312}
313
314/// Executes a single instant query and returns response
315async fn do_instant_query(
316    handler: &PrometheusHandlerRef,
317    prom_query: &PromQuery,
318    query_ctx: QueryContextRef,
319) -> PrometheusJsonResponse {
320    let result = handler.do_query(prom_query, query_ctx).await;
321    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
322        Ok((metric_name, result_type)) => (metric_name, result_type),
323        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
324    };
325    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
326}
327
328#[derive(Debug, Default, Serialize, Deserialize)]
329pub struct RangeQuery {
330    query: Option<String>,
331    start: Option<String>,
332    end: Option<String>,
333    step: Option<String>,
334    lookback: Option<String>,
335    timeout: Option<String>,
336    db: Option<String>,
337}
338
339#[axum_macros::debug_handler]
340#[tracing::instrument(
341    skip_all,
342    fields(protocol = "prometheus", request_type = "range_query")
343)]
344pub async fn range_query(
345    State(handler): State<PrometheusHandlerRef>,
346    Query(params): Query<RangeQuery>,
347    Extension(mut query_ctx): Extension<QueryContext>,
348    Form(form_params): Form<RangeQuery>,
349) -> PrometheusJsonResponse {
350    let prom_query = PromQuery {
351        query: params.query.or(form_params.query).unwrap_or_default(),
352        start: params.start.or(form_params.start).unwrap_or_default(),
353        end: params.end.or(form_params.end).unwrap_or_default(),
354        step: params.step.or(form_params.step).unwrap_or_default(),
355        lookback: params
356            .lookback
357            .or(form_params.lookback)
358            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
359    };
360
361    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
362
363    // update catalog and schema in query context if necessary
364    if let Some(db) = &params.db {
365        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
366        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
367    }
368    let query_ctx = Arc::new(query_ctx);
369    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
370        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
371        .start_timer();
372
373    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
374        && !name_matchers.is_empty()
375    {
376        debug!("Find metric name matchers: {:?}", name_matchers);
377
378        let metric_names =
379            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
380
381        debug!("Find metric names: {:?}", metric_names);
382
383        if metric_names.is_empty() {
384            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
385                result_type: ValueType::Matrix.to_string(),
386                ..Default::default()
387            }));
388        }
389
390        let responses = join_all(metric_names.into_iter().map(|metric| {
391            let mut prom_query = prom_query.clone();
392            let mut promql_expr = promql_expr.clone();
393            let query_ctx = query_ctx.clone();
394            let handler = handler.clone();
395
396            async move {
397                update_metric_name_matcher(&mut promql_expr, &metric);
398                let new_query = promql_expr.to_string();
399                debug!(
400                    "Updated promql, before: {}, after: {}",
401                    &prom_query.query, new_query
402                );
403                prom_query.query = new_query;
404
405                do_range_query(&handler, &prom_query, query_ctx).await
406            }
407        }))
408        .await;
409
410        // Safety: at least one responses, checked above
411        responses
412            .into_iter()
413            .reduce(|mut acc, resp| {
414                acc.data.append(resp.data);
415                acc
416            })
417            .unwrap()
418    } else {
419        do_range_query(&handler, &prom_query, query_ctx).await
420    }
421}
422
423/// Executes a single range query and returns response
424async fn do_range_query(
425    handler: &PrometheusHandlerRef,
426    prom_query: &PromQuery,
427    query_ctx: QueryContextRef,
428) -> PrometheusJsonResponse {
429    let result = handler.do_query(prom_query, query_ctx).await;
430    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
431        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
432        Ok((metric_name, _)) => metric_name,
433    };
434    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
435}
436
437#[derive(Debug, Default, Serialize)]
438struct Matches(Vec<String>);
439
440#[derive(Debug, Default, Serialize, Deserialize)]
441pub struct LabelsQuery {
442    start: Option<String>,
443    end: Option<String>,
444    lookback: Option<String>,
445    #[serde(flatten)]
446    matches: Matches,
447    db: Option<String>,
448}
449
450// Custom Deserialize method to support parsing repeated match[]
451impl<'de> Deserialize<'de> for Matches {
452    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
453    where
454        D: de::Deserializer<'de>,
455    {
456        struct MatchesVisitor;
457
458        impl<'d> Visitor<'d> for MatchesVisitor {
459            type Value = Vec<String>;
460
461            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
462                formatter.write_str("a string")
463            }
464
465            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
466            where
467                M: MapAccess<'d>,
468            {
469                let mut matches = Vec::new();
470                while let Some((key, value)) = access.next_entry::<String, String>()? {
471                    if key == "match[]" {
472                        matches.push(value);
473                    }
474                }
475                Ok(matches)
476            }
477        }
478        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
479    }
480}
481
482/// Handles schema errors, transforming a Result into an Option.
483/// - If the input is `Ok(v)`, returns `Some(v)`
484/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
485///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
486/// - If the input is `Err(err)` with any other error code, directly returns a
487///   `PrometheusJsonResponse::error`.
488macro_rules! handle_schema_err {
489    ($result:expr) => {
490        match $result {
491            Ok(v) => Some(v),
492            Err(err) => {
493                if err.status_code() == StatusCode::TableNotFound
494                    || err.status_code() == StatusCode::TableColumnNotFound
495                {
496                    // Prometheus won't report error if querying nonexist label and metric
497                    None
498                } else {
499                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
500                }
501            }
502        }
503    };
504}
505
506#[axum_macros::debug_handler]
507#[tracing::instrument(
508    skip_all,
509    fields(protocol = "prometheus", request_type = "labels_query")
510)]
511pub async fn labels_query(
512    State(handler): State<PrometheusHandlerRef>,
513    Query(params): Query<LabelsQuery>,
514    Extension(mut query_ctx): Extension<QueryContext>,
515    Form(form_params): Form<LabelsQuery>,
516) -> PrometheusJsonResponse {
517    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
518    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
519    let query_ctx = Arc::new(query_ctx);
520
521    let mut queries = params.matches.0;
522    if queries.is_empty() {
523        queries = form_params.matches.0;
524    }
525
526    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
527        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
528        .start_timer();
529
530    // Fetch all tag columns. It will be used as white-list for tag names.
531    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
532    {
533        Ok(labels) => labels,
534        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
535    };
536    // insert the special metric name label
537    let _ = labels.insert(METRIC_NAME.to_string());
538
539    // Fetch all columns if no query matcher is provided
540    if queries.is_empty() {
541        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
542        labels_vec.sort_unstable();
543        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
544    }
545
546    // Otherwise, run queries and extract column name from result set.
547    let start = params
548        .start
549        .or(form_params.start)
550        .unwrap_or_else(yesterday_rfc3339);
551    let end = params
552        .end
553        .or(form_params.end)
554        .unwrap_or_else(current_time_rfc3339);
555    let lookback = params
556        .lookback
557        .or(form_params.lookback)
558        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
559
560    let mut fetched_labels = HashSet::new();
561    let _ = fetched_labels.insert(METRIC_NAME.to_string());
562
563    let mut merge_map = HashMap::new();
564    for query in queries {
565        let prom_query = PromQuery {
566            query,
567            start: start.clone(),
568            end: end.clone(),
569            step: DEFAULT_LOOKBACK_STRING.to_string(),
570            lookback: lookback.clone(),
571        };
572
573        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
574        handle_schema_err!(
575            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
576                .await
577        );
578    }
579
580    // intersect `fetched_labels` with `labels` to filter out non-tag columns
581    fetched_labels.retain(|l| labels.contains(l));
582    let _ = labels.insert(METRIC_NAME.to_string());
583
584    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
585    sorted_labels.sort();
586    let merge_map = merge_map
587        .into_iter()
588        .map(|(k, v)| (k, Value::from(v)))
589        .collect();
590    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
591    resp.resp_metrics = merge_map;
592    resp
593}
594
595/// Get all tag column name of the given schema
596async fn get_all_column_names(
597    catalog: &str,
598    schema: &str,
599    manager: &CatalogManagerRef,
600) -> std::result::Result<HashSet<String>, catalog::error::Error> {
601    let table_names = manager.table_names(catalog, schema, None).await?;
602
603    let mut labels = HashSet::new();
604    for table_name in table_names {
605        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
606            continue;
607        };
608        for column in table.primary_key_columns() {
609            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
610                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
611            {
612                labels.insert(column.name);
613            }
614        }
615    }
616
617    Ok(labels)
618}
619
620async fn retrieve_series_from_query_result(
621    result: Result<Output>,
622    series: &mut Vec<HashMap<String, String>>,
623    query_ctx: &QueryContext,
624    table_name: &str,
625    manager: &CatalogManagerRef,
626    metrics: &mut HashMap<String, u64>,
627) -> Result<()> {
628    let result = result?;
629
630    // fetch tag list
631    let table = manager
632        .table(
633            query_ctx.current_catalog(),
634            &query_ctx.current_schema(),
635            table_name,
636            Some(query_ctx),
637        )
638        .await
639        .context(CatalogSnafu)?
640        .with_context(|| TableNotFoundSnafu {
641            catalog: query_ctx.current_catalog(),
642            schema: query_ctx.current_schema(),
643            table: table_name,
644        })?;
645    let tag_columns = table
646        .primary_key_columns()
647        .map(|c| c.name)
648        .collect::<HashSet<_>>();
649
650    match result.data {
651        OutputData::RecordBatches(batches) => {
652            record_batches_to_series(batches, series, table_name, &tag_columns)
653        }
654        OutputData::Stream(stream) => {
655            let batches = RecordBatches::try_collect(stream)
656                .await
657                .context(CollectRecordbatchSnafu)?;
658            record_batches_to_series(batches, series, table_name, &tag_columns)
659        }
660        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
661            reason: "expected data result, but got affected rows".to_string(),
662            location: Location::default(),
663        }),
664    }?;
665
666    if let Some(ref plan) = result.meta.plan {
667        collect_plan_metrics(plan, &mut [metrics]);
668    }
669    Ok(())
670}
671
672/// Retrieve labels name from query result
673async fn retrieve_labels_name_from_query_result(
674    result: Result<Output>,
675    labels: &mut HashSet<String>,
676    metrics: &mut HashMap<String, u64>,
677) -> Result<()> {
678    let result = result?;
679    match result.data {
680        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
681        OutputData::Stream(stream) => {
682            let batches = RecordBatches::try_collect(stream)
683                .await
684                .context(CollectRecordbatchSnafu)?;
685            record_batches_to_labels_name(batches, labels)
686        }
687        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
688            reason: "expected data result, but got affected rows".to_string(),
689        }
690        .fail(),
691    }?;
692    if let Some(ref plan) = result.meta.plan {
693        collect_plan_metrics(plan, &mut [metrics]);
694    }
695    Ok(())
696}
697
698fn record_batches_to_series(
699    batches: RecordBatches,
700    series: &mut Vec<HashMap<String, String>>,
701    table_name: &str,
702    tag_columns: &HashSet<String>,
703) -> Result<()> {
704    for batch in batches.iter() {
705        // project record batch to only contains tag columns
706        let projection = batch
707            .schema
708            .column_schemas()
709            .iter()
710            .enumerate()
711            .filter_map(|(idx, col)| {
712                if tag_columns.contains(&col.name) {
713                    Some(idx)
714                } else {
715                    None
716                }
717            })
718            .collect::<Vec<_>>();
719        let batch = batch
720            .try_project(&projection)
721            .context(CollectRecordbatchSnafu)?;
722
723        for row in batch.rows() {
724            let mut element: HashMap<String, String> = row
725                .iter()
726                .enumerate()
727                .map(|(idx, column)| {
728                    let column_name = batch.schema.column_name_by_index(idx);
729                    (column_name.to_string(), column.to_string())
730                })
731                .collect();
732            let _ = element.insert("__name__".to_string(), table_name.to_string());
733            series.push(element);
734        }
735    }
736    Ok(())
737}
738
739/// Retrieve labels name from record batches
740fn record_batches_to_labels_name(
741    batches: RecordBatches,
742    labels: &mut HashSet<String>,
743) -> Result<()> {
744    let mut column_indices = Vec::new();
745    let mut field_column_indices = Vec::new();
746    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
747        if let ConcreteDataType::Float64(_) = column.data_type {
748            field_column_indices.push(i);
749        }
750        column_indices.push(i);
751    }
752
753    if field_column_indices.is_empty() {
754        return Err(Error::Internal {
755            err_msg: "no value column found".to_string(),
756        });
757    }
758
759    for batch in batches.iter() {
760        let names = column_indices
761            .iter()
762            .map(|c| batches.schema().column_name_by_index(*c).to_string())
763            .collect::<Vec<_>>();
764
765        let field_columns = field_column_indices
766            .iter()
767            .map(|i| {
768                batch
769                    .column(*i)
770                    .as_any()
771                    .downcast_ref::<Float64Vector>()
772                    .unwrap()
773            })
774            .collect::<Vec<_>>();
775
776        for row_index in 0..batch.num_rows() {
777            // if all field columns are null, skip this row
778            if field_columns
779                .iter()
780                .all(|c| c.get_data(row_index).is_none())
781            {
782                continue;
783            }
784
785            // if a field is not null, record the tag name and return
786            names.iter().for_each(|name| {
787                let _ = labels.insert(name.to_string());
788            });
789            return Ok(());
790        }
791    }
792    Ok(())
793}
794
795pub(crate) fn retrieve_metric_name_and_result_type(
796    promql: &str,
797) -> Result<(Option<String>, ValueType)> {
798    let promql_expr = promql_parser::parser::parse(promql)
799        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
800    let metric_name = promql_expr_to_metric_name(&promql_expr);
801    let result_type = promql_expr.value_type();
802
803    Ok((metric_name, result_type))
804}
805
806/// Tries to get catalog and schema from an optional db param. And retrieves
807/// them from [QueryContext] if they don't present.
808pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
809    if let Some(db) = db {
810        parse_catalog_and_schema_from_db_string(db)
811    } else {
812        (
813            ctx.current_catalog().to_string(),
814            ctx.current_schema().to_string(),
815        )
816    }
817}
818
819/// Update catalog and schema in [QueryContext] if necessary.
820pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
821    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
822        ctx.set_current_catalog(catalog);
823        ctx.set_current_schema(schema);
824    }
825}
826
827fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
828    let mut metric_names = HashSet::new();
829    collect_metric_names(expr, &mut metric_names);
830
831    // Return the metric name only if there's exactly one unique metric name
832    if metric_names.len() == 1 {
833        metric_names.into_iter().next()
834    } else {
835        None
836    }
837}
838
839/// Recursively collect all metric names from a PromQL expression
840fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
841    match expr {
842        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
843            match modifier {
844                Some(LabelModifier::Include(labels)) => {
845                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
846                        metric_names.clear();
847                        return;
848                    }
849                }
850                Some(LabelModifier::Exclude(labels)) => {
851                    if labels.labels.contains(&METRIC_NAME.to_string()) {
852                        metric_names.clear();
853                        return;
854                    }
855                }
856                _ => {}
857            }
858            collect_metric_names(expr, metric_names)
859        }
860        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
861        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
862            if matches!(
863                op.id(),
864                token::T_LAND // INTERSECT
865                    | token::T_LOR // UNION
866                    | token::T_LUNLESS // EXCEPT
867            ) {
868                collect_metric_names(lhs, metric_names)
869            } else {
870                metric_names.clear()
871            }
872        }
873        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
874        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
875        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
876            if let Some(name) = name {
877                metric_names.insert(name.clone());
878            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
879                metric_names.insert(matcher.value);
880            }
881        }
882        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
883            let VectorSelector { name, matchers, .. } = vs;
884            if let Some(name) = name {
885                metric_names.insert(name.clone());
886            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
887                metric_names.insert(matcher.value);
888            }
889        }
890        PromqlExpr::Call(Call { args, .. }) => {
891            args.args
892                .iter()
893                .for_each(|e| collect_metric_names(e, metric_names));
894        }
895        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
896    }
897}
898
899fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
900where
901    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
902{
903    match expr {
904        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
905        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
906        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
907            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
908        }
909        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
910        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
911        PromqlExpr::NumberLiteral(_) => None,
912        PromqlExpr::StringLiteral(_) => None,
913        PromqlExpr::Extension(_) => None,
914        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
915        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
916            let VectorSelector { name, matchers, .. } = vs;
917
918            f(name, matchers)
919        }
920        PromqlExpr::Call(Call { args, .. }) => args
921            .args
922            .iter()
923            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
924    }
925}
926
927/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
928fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
929    find_metric_name_and_matchers(expr, |name, matchers| {
930        // Has name, ignore the matchers
931        if name.is_some() {
932            return None;
933        }
934
935        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
936        Some(matchers.find_matchers(METRIC_NAME))
937    })
938    .map(|matchers| {
939        matchers
940            .into_iter()
941            .filter(|m| !matches!(m.op, MatchOp::Equal))
942            .map(normalize_matcher)
943            .collect::<Vec<_>>()
944    })
945}
946
947/// Update the `__name__` matchers in expression into special value
948/// Returns the updated expression.
949fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
950    match expr {
951        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
952            update_metric_name_matcher(expr, metric_name)
953        }
954        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
955        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
956            update_metric_name_matcher(lhs, metric_name);
957            update_metric_name_matcher(rhs, metric_name);
958        }
959        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
960        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
961            update_metric_name_matcher(expr, metric_name)
962        }
963        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
964            if name.is_some() {
965                return;
966            }
967
968            for m in &mut matchers.matchers {
969                if m.name == METRIC_NAME {
970                    m.op = MatchOp::Equal;
971                    m.value = metric_name.to_string();
972                }
973            }
974        }
975        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
976            let VectorSelector { name, matchers, .. } = vs;
977            if name.is_some() {
978                return;
979            }
980
981            for m in &mut matchers.matchers {
982                if m.name == METRIC_NAME {
983                    m.op = MatchOp::Equal;
984                    m.value = metric_name.to_string();
985                }
986            }
987        }
988        PromqlExpr::Call(Call { args, .. }) => {
989            args.args.iter_mut().for_each(|e| {
990                update_metric_name_matcher(e, metric_name);
991            });
992        }
993        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
994    }
995}
996
997#[derive(Debug, Default, Serialize, Deserialize)]
998pub struct LabelValueQuery {
999    start: Option<String>,
1000    end: Option<String>,
1001    lookback: Option<String>,
1002    #[serde(flatten)]
1003    matches: Matches,
1004    db: Option<String>,
1005    limit: Option<usize>,
1006}
1007
1008#[axum_macros::debug_handler]
1009#[tracing::instrument(
1010    skip_all,
1011    fields(protocol = "prometheus", request_type = "label_values_query")
1012)]
1013pub async fn label_values_query(
1014    State(handler): State<PrometheusHandlerRef>,
1015    Path(label_name): Path<String>,
1016    Extension(mut query_ctx): Extension<QueryContext>,
1017    Query(params): Query<LabelValueQuery>,
1018) -> PrometheusJsonResponse {
1019    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1020    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1021    let query_ctx = Arc::new(query_ctx);
1022
1023    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1024        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1025        .start_timer();
1026
1027    if label_name == METRIC_NAME_LABEL {
1028        let catalog_manager = handler.catalog_manager();
1029
1030        let mut table_names = try_call_return_response!(
1031            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1032        );
1033
1034        truncate_results(&mut table_names, params.limit);
1035        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1036    } else if label_name == FIELD_NAME_LABEL {
1037        let field_columns = handle_schema_err!(
1038            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1039        )
1040        .unwrap_or_default();
1041        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1042        field_columns.sort_unstable();
1043        truncate_results(&mut field_columns, params.limit);
1044        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1045    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1046        let catalog_manager = handler.catalog_manager();
1047
1048        let mut schema_names = try_call_return_response!(
1049            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1050        );
1051        truncate_results(&mut schema_names, params.limit);
1052        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1053    }
1054
1055    let queries = params.matches.0;
1056    if queries.is_empty() {
1057        return PrometheusJsonResponse::error(
1058            StatusCode::InvalidArguments,
1059            "match[] parameter is required",
1060        );
1061    }
1062
1063    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1064    let end = params.end.unwrap_or_else(current_time_rfc3339);
1065    let mut label_values = HashSet::new();
1066
1067    let start = try_call_return_response!(
1068        QueryLanguageParser::parse_promql_timestamp(&start)
1069            .context(ParseTimestampSnafu { timestamp: &start })
1070    );
1071    let end = try_call_return_response!(
1072        QueryLanguageParser::parse_promql_timestamp(&end)
1073            .context(ParseTimestampSnafu { timestamp: &end })
1074    );
1075
1076    for query in queries {
1077        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1078        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1079            return PrometheusJsonResponse::error(
1080                StatusCode::InvalidArguments,
1081                "expected vector selector",
1082            );
1083        };
1084        let Some(name) = take_metric_name(&mut vector_selector) else {
1085            return PrometheusJsonResponse::error(
1086                StatusCode::InvalidArguments,
1087                "expected metric name",
1088            );
1089        };
1090        let VectorSelector { matchers, .. } = vector_selector;
1091        // Only use and filter matchers.
1092        let matchers = matchers.matchers;
1093        let result = handler
1094            .query_label_values(
1095                name,
1096                label_name.to_string(),
1097                matchers,
1098                start,
1099                end,
1100                &query_ctx,
1101            )
1102            .await;
1103        if let Some(result) = handle_schema_err!(result) {
1104            label_values.extend(result.into_iter());
1105        }
1106    }
1107
1108    let mut label_values: Vec<_> = label_values.into_iter().collect();
1109    label_values.sort_unstable();
1110    truncate_results(&mut label_values, params.limit);
1111
1112    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1113}
1114
1115fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1116    if let Some(limit) = limit
1117        && limit > 0
1118        && label_values.len() >= limit
1119    {
1120        label_values.truncate(limit);
1121    }
1122}
1123
1124/// Take metric name from the [VectorSelector].
1125/// It takes the name in the selector or removes the name matcher.
1126fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1127    if let Some(name) = selector.name.take() {
1128        return Some(name);
1129    }
1130
1131    let (pos, matcher) = selector
1132        .matchers
1133        .matchers
1134        .iter()
1135        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1136    let name = matcher.value.clone();
1137    // We need to remove the name matcher to avoid using it as a filter in query.
1138    selector.matchers.matchers.remove(pos);
1139
1140    Some(name)
1141}
1142
1143async fn retrieve_table_names(
1144    query_ctx: &QueryContext,
1145    catalog_manager: CatalogManagerRef,
1146    matches: Vec<String>,
1147) -> Result<Vec<String>> {
1148    let catalog = query_ctx.current_catalog();
1149    let schema = query_ctx.current_schema();
1150
1151    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1152    let mut table_names = Vec::new();
1153
1154    // we only provide very limited support for matcher against __name__
1155    let name_matcher = matches
1156        .first()
1157        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1158        .and_then(|expr| {
1159            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1160                let matchers = vector_selector.matchers.matchers;
1161                for matcher in matchers {
1162                    if matcher.name == METRIC_NAME_LABEL {
1163                        return Some(matcher);
1164                    }
1165                }
1166
1167                None
1168            } else {
1169                None
1170            }
1171        });
1172
1173    while let Some(table) = tables_stream.next().await {
1174        let table = table.context(CatalogSnafu)?;
1175        if !table
1176            .table_info()
1177            .meta
1178            .options
1179            .extra_options
1180            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1181        {
1182            // skip non-prometheus (non-metricengine) tables for __name__ query
1183            continue;
1184        }
1185
1186        let table_name = &table.table_info().name;
1187
1188        if let Some(matcher) = &name_matcher {
1189            match &matcher.op {
1190                MatchOp::Equal => {
1191                    if table_name == &matcher.value {
1192                        table_names.push(table_name.clone());
1193                    }
1194                }
1195                MatchOp::Re(reg) => {
1196                    if reg.is_match(table_name) {
1197                        table_names.push(table_name.clone());
1198                    }
1199                }
1200                _ => {
1201                    // != and !~ are not supported:
1202                    // vector must contains at least one non-empty matcher
1203                    table_names.push(table_name.clone());
1204                }
1205            }
1206        } else {
1207            table_names.push(table_name.clone());
1208        }
1209    }
1210
1211    table_names.sort_unstable();
1212    Ok(table_names)
1213}
1214
1215async fn retrieve_field_names(
1216    query_ctx: &QueryContext,
1217    manager: CatalogManagerRef,
1218    matches: Vec<String>,
1219) -> Result<HashSet<String>> {
1220    let mut field_columns = HashSet::new();
1221    let catalog = query_ctx.current_catalog();
1222    let schema = query_ctx.current_schema();
1223
1224    if matches.is_empty() {
1225        // query all tables if no matcher is provided
1226        while let Some(table) = manager
1227            .tables(catalog, &schema, Some(query_ctx))
1228            .next()
1229            .await
1230        {
1231            let table = table.context(CatalogSnafu)?;
1232            for column in table.field_columns() {
1233                field_columns.insert(column.name);
1234            }
1235        }
1236        return Ok(field_columns);
1237    }
1238
1239    for table_name in matches {
1240        let table = manager
1241            .table(catalog, &schema, &table_name, Some(query_ctx))
1242            .await
1243            .context(CatalogSnafu)?
1244            .with_context(|| TableNotFoundSnafu {
1245                catalog: catalog.to_string(),
1246                schema: schema.to_string(),
1247                table: table_name.to_string(),
1248            })?;
1249
1250        for column in table.field_columns() {
1251            field_columns.insert(column.name);
1252        }
1253    }
1254    Ok(field_columns)
1255}
1256
1257async fn retrieve_schema_names(
1258    query_ctx: &QueryContext,
1259    catalog_manager: CatalogManagerRef,
1260    matches: Vec<String>,
1261) -> Result<Vec<String>> {
1262    let mut schemas = Vec::new();
1263    let catalog = query_ctx.current_catalog();
1264
1265    let candidate_schemas = catalog_manager
1266        .schema_names(catalog, Some(query_ctx))
1267        .await
1268        .context(CatalogSnafu)?;
1269
1270    for schema in candidate_schemas {
1271        let mut found = true;
1272        for match_item in &matches {
1273            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1274                let exists = catalog_manager
1275                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1276                    .await
1277                    .context(CatalogSnafu)?;
1278                if !exists {
1279                    found = false;
1280                    break;
1281                }
1282            }
1283        }
1284
1285        if found {
1286            schemas.push(schema);
1287        }
1288    }
1289
1290    schemas.sort_unstable();
1291
1292    Ok(schemas)
1293}
1294
1295/// Try to parse and extract the name of referenced metric from the promql query.
1296///
1297/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1298/// Multiple references to the same metric are allowed.
1299fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1300    let promql_expr = promql_parser::parser::parse(query).ok()?;
1301    promql_expr_to_metric_name(&promql_expr)
1302}
1303
1304#[derive(Debug, Default, Serialize, Deserialize)]
1305pub struct SeriesQuery {
1306    start: Option<String>,
1307    end: Option<String>,
1308    lookback: Option<String>,
1309    #[serde(flatten)]
1310    matches: Matches,
1311    db: Option<String>,
1312}
1313
1314#[axum_macros::debug_handler]
1315#[tracing::instrument(
1316    skip_all,
1317    fields(protocol = "prometheus", request_type = "series_query")
1318)]
1319pub async fn series_query(
1320    State(handler): State<PrometheusHandlerRef>,
1321    Query(params): Query<SeriesQuery>,
1322    Extension(mut query_ctx): Extension<QueryContext>,
1323    Form(form_params): Form<SeriesQuery>,
1324) -> PrometheusJsonResponse {
1325    let mut queries: Vec<String> = params.matches.0;
1326    if queries.is_empty() {
1327        queries = form_params.matches.0;
1328    }
1329    if queries.is_empty() {
1330        return PrometheusJsonResponse::error(
1331            StatusCode::Unsupported,
1332            "match[] parameter is required",
1333        );
1334    }
1335    let start = params
1336        .start
1337        .or(form_params.start)
1338        .unwrap_or_else(yesterday_rfc3339);
1339    let end = params
1340        .end
1341        .or(form_params.end)
1342        .unwrap_or_else(current_time_rfc3339);
1343    let lookback = params
1344        .lookback
1345        .or(form_params.lookback)
1346        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1347
1348    // update catalog and schema in query context if necessary
1349    if let Some(db) = &params.db {
1350        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1351        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1352    }
1353    let query_ctx = Arc::new(query_ctx);
1354
1355    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1356        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1357        .start_timer();
1358
1359    let mut series = Vec::new();
1360    let mut merge_map = HashMap::new();
1361    for query in queries {
1362        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1363        let prom_query = PromQuery {
1364            query,
1365            start: start.clone(),
1366            end: end.clone(),
1367            // TODO: find a better value for step
1368            step: DEFAULT_LOOKBACK_STRING.to_string(),
1369            lookback: lookback.clone(),
1370        };
1371        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1372
1373        handle_schema_err!(
1374            retrieve_series_from_query_result(
1375                result,
1376                &mut series,
1377                &query_ctx,
1378                &table_name,
1379                &handler.catalog_manager(),
1380                &mut merge_map,
1381            )
1382            .await
1383        );
1384    }
1385    let merge_map = merge_map
1386        .into_iter()
1387        .map(|(k, v)| (k, Value::from(v)))
1388        .collect();
1389    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1390    resp.resp_metrics = merge_map;
1391    resp
1392}
1393
1394#[derive(Debug, Default, Serialize, Deserialize)]
1395pub struct ParseQuery {
1396    query: Option<String>,
1397    db: Option<String>,
1398}
1399
1400#[axum_macros::debug_handler]
1401#[tracing::instrument(
1402    skip_all,
1403    fields(protocol = "prometheus", request_type = "parse_query")
1404)]
1405pub async fn parse_query(
1406    State(_handler): State<PrometheusHandlerRef>,
1407    Query(params): Query<ParseQuery>,
1408    Extension(_query_ctx): Extension<QueryContext>,
1409    Form(form_params): Form<ParseQuery>,
1410) -> PrometheusJsonResponse {
1411    if let Some(query) = params.query.or(form_params.query) {
1412        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1413        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1414    } else {
1415        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1416    }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421    use promql_parser::parser::value::ValueType;
1422
1423    use super::*;
1424
1425    struct TestCase {
1426        name: &'static str,
1427        promql: &'static str,
1428        expected_metric: Option<&'static str>,
1429        expected_type: ValueType,
1430        should_error: bool,
1431    }
1432
1433    #[test]
1434    fn test_retrieve_metric_name_and_result_type() {
1435        let test_cases = &[
1436            // Single metric cases
1437            TestCase {
1438                name: "simple metric",
1439                promql: "cpu_usage",
1440                expected_metric: Some("cpu_usage"),
1441                expected_type: ValueType::Vector,
1442                should_error: false,
1443            },
1444            TestCase {
1445                name: "metric with selector",
1446                promql: r#"cpu_usage{instance="localhost"}"#,
1447                expected_metric: Some("cpu_usage"),
1448                expected_type: ValueType::Vector,
1449                should_error: false,
1450            },
1451            TestCase {
1452                name: "metric with range selector",
1453                promql: "cpu_usage[5m]",
1454                expected_metric: Some("cpu_usage"),
1455                expected_type: ValueType::Matrix,
1456                should_error: false,
1457            },
1458            TestCase {
1459                name: "metric with __name__ matcher",
1460                promql: r#"{__name__="cpu_usage"}"#,
1461                expected_metric: Some("cpu_usage"),
1462                expected_type: ValueType::Vector,
1463                should_error: false,
1464            },
1465            TestCase {
1466                name: "metric with unary operator",
1467                promql: "-cpu_usage",
1468                expected_metric: None,
1469                expected_type: ValueType::Vector,
1470                should_error: false,
1471            },
1472            // Aggregation and function cases
1473            TestCase {
1474                name: "metric with aggregation",
1475                promql: "sum(cpu_usage)",
1476                expected_metric: Some("cpu_usage"),
1477                expected_type: ValueType::Vector,
1478                should_error: false,
1479            },
1480            TestCase {
1481                name: "complex aggregation",
1482                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1483                expected_metric: None,
1484                expected_type: ValueType::Vector,
1485                should_error: false,
1486            },
1487            TestCase {
1488                name: "complex aggregation",
1489                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1490                expected_metric: Some("cpu_usage"),
1491                expected_type: ValueType::Vector,
1492                should_error: false,
1493            },
1494            TestCase {
1495                name: "complex aggregation",
1496                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1497                expected_metric: Some("cpu_usage"),
1498                expected_type: ValueType::Vector,
1499                should_error: false,
1500            },
1501            // Same metric binary operations
1502            TestCase {
1503                name: "same metric addition",
1504                promql: "cpu_usage + cpu_usage",
1505                expected_metric: None,
1506                expected_type: ValueType::Vector,
1507                should_error: false,
1508            },
1509            TestCase {
1510                name: "metric with scalar addition",
1511                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1512                expected_metric: None,
1513                expected_type: ValueType::Vector,
1514                should_error: false,
1515            },
1516            // Multiple metrics cases
1517            TestCase {
1518                name: "different metrics addition",
1519                promql: "cpu_usage + memory_usage",
1520                expected_metric: None,
1521                expected_type: ValueType::Vector,
1522                should_error: false,
1523            },
1524            TestCase {
1525                name: "different metrics subtraction",
1526                promql: "network_in - network_out",
1527                expected_metric: None,
1528                expected_type: ValueType::Vector,
1529                should_error: false,
1530            },
1531            // Unless operator cases
1532            TestCase {
1533                name: "unless with different metrics",
1534                promql: "cpu_usage unless memory_usage",
1535                expected_metric: Some("cpu_usage"),
1536                expected_type: ValueType::Vector,
1537                should_error: false,
1538            },
1539            TestCase {
1540                name: "unless with same metric",
1541                promql: "cpu_usage unless cpu_usage",
1542                expected_metric: Some("cpu_usage"),
1543                expected_type: ValueType::Vector,
1544                should_error: false,
1545            },
1546            // Subquery cases
1547            TestCase {
1548                name: "basic subquery",
1549                promql: "cpu_usage[5m:1m]",
1550                expected_metric: Some("cpu_usage"),
1551                expected_type: ValueType::Matrix,
1552                should_error: false,
1553            },
1554            TestCase {
1555                name: "subquery with multiple metrics",
1556                promql: "(cpu_usage + memory_usage)[5m:1m]",
1557                expected_metric: None,
1558                expected_type: ValueType::Matrix,
1559                should_error: false,
1560            },
1561            // Literal values
1562            TestCase {
1563                name: "scalar value",
1564                promql: "42",
1565                expected_metric: None,
1566                expected_type: ValueType::Scalar,
1567                should_error: false,
1568            },
1569            TestCase {
1570                name: "string literal",
1571                promql: r#""hello world""#,
1572                expected_metric: None,
1573                expected_type: ValueType::String,
1574                should_error: false,
1575            },
1576            // Error cases
1577            TestCase {
1578                name: "invalid syntax",
1579                promql: "cpu_usage{invalid=",
1580                expected_metric: None,
1581                expected_type: ValueType::Vector,
1582                should_error: true,
1583            },
1584            TestCase {
1585                name: "empty query",
1586                promql: "",
1587                expected_metric: None,
1588                expected_type: ValueType::Vector,
1589                should_error: true,
1590            },
1591            TestCase {
1592                name: "malformed brackets",
1593                promql: "cpu_usage[5m",
1594                expected_metric: None,
1595                expected_type: ValueType::Vector,
1596                should_error: true,
1597            },
1598        ];
1599
1600        for test_case in test_cases {
1601            let result = retrieve_metric_name_and_result_type(test_case.promql);
1602
1603            if test_case.should_error {
1604                assert!(
1605                    result.is_err(),
1606                    "Test '{}' should have failed but succeeded with: {:?}",
1607                    test_case.name,
1608                    result
1609                );
1610            } else {
1611                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1612                    panic!(
1613                        "Test '{}' should have succeeded but failed with error: {}",
1614                        test_case.name, e
1615                    )
1616                });
1617
1618                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1619                assert_eq!(
1620                    metric_name, expected_metric_name,
1621                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1622                    test_case.name, expected_metric_name, metric_name
1623                );
1624
1625                assert_eq!(
1626                    value_type, test_case.expected_type,
1627                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1628                    test_case.name, test_case.expected_type, value_type
1629                );
1630            }
1631        }
1632    }
1633}