servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::StreamExt;
34use futures::future::join_all;
35use itertools::Itertools;
36use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
37use promql_parser::parser::token::{self};
38use promql_parser::parser::value::ValueType;
39use promql_parser::parser::{
40    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
41    SubqueryExpr, UnaryExpr, VectorSelector,
42};
43use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
44use query::promql::planner::normalize_matcher;
45use serde::de::{self, MapAccess, Visitor};
46use serde::{Deserialize, Serialize};
47use serde_json::Value;
48use session::context::{QueryContext, QueryContextRef};
49use snafu::{Location, OptionExt, ResultExt};
50use store_api::metric_engine_consts::{
51    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
52};
53
54pub use super::result::prometheus_resp::PrometheusJsonResponse;
55use crate::error::{
56    CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
57    TableNotFoundSnafu, UnexpectedResultSnafu,
58};
59use crate::http::header::collect_plan_metrics;
60use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
61use crate::prometheus_handler::PrometheusHandlerRef;
62
63/// For [ValueType::Vector] result type
64#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
65pub struct PromSeriesVector {
66    pub metric: BTreeMap<String, String>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub value: Option<(f64, String)>,
69}
70
71/// For [ValueType::Matrix] result type
72#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
73pub struct PromSeriesMatrix {
74    pub metric: BTreeMap<String, String>,
75    pub values: Vec<(f64, String)>,
76}
77
78/// Variants corresponding to [ValueType]
79#[derive(Debug, Serialize, Deserialize, PartialEq)]
80#[serde(untagged)]
81pub enum PromQueryResult {
82    Matrix(Vec<PromSeriesMatrix>),
83    Vector(Vec<PromSeriesVector>),
84    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
86}
87
88impl Default for PromQueryResult {
89    fn default() -> Self {
90        PromQueryResult::Matrix(Default::default())
91    }
92}
93
94#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromData {
96    #[serde(rename = "resultType")]
97    pub result_type: String,
98    pub result: PromQueryResult,
99}
100
101#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PrometheusResponse {
104    PromData(PromData),
105    Labels(Vec<String>),
106    Series(Vec<HashMap<String, String>>),
107    LabelValues(Vec<String>),
108    FormatQuery(String),
109    BuildInfo(OwnedBuildInfo),
110    #[serde(skip_deserializing)]
111    ParseResult(promql_parser::parser::Expr),
112    #[default]
113    None,
114}
115
116impl PrometheusResponse {
117    /// Append the other [`PrometheusResponse]`.
118    /// # NOTE
119    ///   Only append matrix and vector results, otherwise just ignore the other response.
120    fn append(&mut self, other: PrometheusResponse) {
121        match (self, other) {
122            (
123                PrometheusResponse::PromData(PromData {
124                    result: PromQueryResult::Matrix(lhs),
125                    ..
126                }),
127                PrometheusResponse::PromData(PromData {
128                    result: PromQueryResult::Matrix(rhs),
129                    ..
130                }),
131            ) => {
132                lhs.extend(rhs);
133            }
134
135            (
136                PrometheusResponse::PromData(PromData {
137                    result: PromQueryResult::Vector(lhs),
138                    ..
139                }),
140                PrometheusResponse::PromData(PromData {
141                    result: PromQueryResult::Vector(rhs),
142                    ..
143                }),
144            ) => {
145                lhs.extend(rhs);
146            }
147            _ => {
148                // TODO(dennis): process other cases?
149            }
150        }
151    }
152
153    pub fn is_none(&self) -> bool {
154        matches!(self, PrometheusResponse::None)
155    }
156}
157
158#[derive(Debug, Default, Serialize, Deserialize)]
159pub struct FormatQuery {
160    query: Option<String>,
161}
162
163#[axum_macros::debug_handler]
164#[tracing::instrument(
165    skip_all,
166    fields(protocol = "prometheus", request_type = "format_query")
167)]
168pub async fn format_query(
169    State(_handler): State<PrometheusHandlerRef>,
170    Query(params): Query<InstantQuery>,
171    Extension(_query_ctx): Extension<QueryContext>,
172    Form(form_params): Form<InstantQuery>,
173) -> PrometheusJsonResponse {
174    let query = params.query.or(form_params.query).unwrap_or_default();
175    match promql_parser::parser::parse(&query) {
176        Ok(expr) => {
177            let pretty = expr.prettify();
178            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
179        }
180        Err(reason) => {
181            let err = InvalidQuerySnafu { reason }.build();
182            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
183        }
184    }
185}
186
187#[derive(Debug, Default, Serialize, Deserialize)]
188pub struct BuildInfoQuery {}
189
190#[axum_macros::debug_handler]
191#[tracing::instrument(
192    skip_all,
193    fields(protocol = "prometheus", request_type = "build_info_query")
194)]
195pub async fn build_info_query() -> PrometheusJsonResponse {
196    let build_info = common_version::build_info().clone();
197    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
198}
199
200#[derive(Debug, Default, Serialize, Deserialize)]
201pub struct InstantQuery {
202    query: Option<String>,
203    lookback: Option<String>,
204    time: Option<String>,
205    timeout: Option<String>,
206    db: Option<String>,
207}
208
209/// Helper macro which try to evaluate the expression and return its results.
210/// If the evaluation fails, return a `PrometheusJsonResponse` early.
211macro_rules! try_call_return_response {
212    ($handle: expr) => {
213        match $handle {
214            Ok(res) => res,
215            Err(err) => {
216                let msg = err.to_string();
217                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
218            }
219        }
220    };
221}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225    skip_all,
226    fields(protocol = "prometheus", request_type = "instant_query")
227)]
228pub async fn instant_query(
229    State(handler): State<PrometheusHandlerRef>,
230    Query(params): Query<InstantQuery>,
231    Extension(mut query_ctx): Extension<QueryContext>,
232    Form(form_params): Form<InstantQuery>,
233) -> PrometheusJsonResponse {
234    // Extract time from query string, or use current server time if not specified.
235    let time = params
236        .time
237        .or(form_params.time)
238        .unwrap_or_else(current_time_rfc3339);
239    let prom_query = PromQuery {
240        query: params.query.or(form_params.query).unwrap_or_default(),
241        start: time.clone(),
242        end: time,
243        step: "1s".to_string(),
244        lookback: params
245            .lookback
246            .or(form_params.lookback)
247            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
248        alias: None,
249    };
250
251    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
252
253    // update catalog and schema in query context if necessary
254    if let Some(db) = &params.db {
255        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
256        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
257    }
258    let query_ctx = Arc::new(query_ctx);
259
260    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
261        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
262        .start_timer();
263
264    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
265        && !name_matchers.is_empty()
266    {
267        debug!("Find metric name matchers: {:?}", name_matchers);
268
269        let metric_names =
270            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
271
272        debug!("Find metric names: {:?}", metric_names);
273
274        if metric_names.is_empty() {
275            let result_type = promql_expr.value_type();
276
277            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
278                result_type: result_type.to_string(),
279                ..Default::default()
280            }));
281        }
282
283        let responses = join_all(metric_names.into_iter().map(|metric| {
284            let mut prom_query = prom_query.clone();
285            let mut promql_expr = promql_expr.clone();
286            let query_ctx = query_ctx.clone();
287            let handler = handler.clone();
288
289            async move {
290                update_metric_name_matcher(&mut promql_expr, &metric);
291                let new_query = promql_expr.to_string();
292                debug!(
293                    "Updated promql, before: {}, after: {}",
294                    &prom_query.query, new_query
295                );
296                prom_query.query = new_query;
297
298                do_instant_query(&handler, &prom_query, query_ctx).await
299            }
300        }))
301        .await;
302
303        responses
304            .into_iter()
305            .reduce(|mut acc, resp| {
306                acc.data.append(resp.data);
307                acc
308            })
309            .unwrap()
310    } else {
311        do_instant_query(&handler, &prom_query, query_ctx).await
312    }
313}
314
315/// Executes a single instant query and returns response
316async fn do_instant_query(
317    handler: &PrometheusHandlerRef,
318    prom_query: &PromQuery,
319    query_ctx: QueryContextRef,
320) -> PrometheusJsonResponse {
321    let result = handler.do_query(prom_query, query_ctx).await;
322    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
323        Ok((metric_name, result_type)) => (metric_name, result_type),
324        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
325    };
326    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
327}
328
329#[derive(Debug, Default, Serialize, Deserialize)]
330pub struct RangeQuery {
331    query: Option<String>,
332    start: Option<String>,
333    end: Option<String>,
334    step: Option<String>,
335    lookback: Option<String>,
336    timeout: Option<String>,
337    db: Option<String>,
338}
339
340#[axum_macros::debug_handler]
341#[tracing::instrument(
342    skip_all,
343    fields(protocol = "prometheus", request_type = "range_query")
344)]
345pub async fn range_query(
346    State(handler): State<PrometheusHandlerRef>,
347    Query(params): Query<RangeQuery>,
348    Extension(mut query_ctx): Extension<QueryContext>,
349    Form(form_params): Form<RangeQuery>,
350) -> PrometheusJsonResponse {
351    let prom_query = PromQuery {
352        query: params.query.or(form_params.query).unwrap_or_default(),
353        start: params.start.or(form_params.start).unwrap_or_default(),
354        end: params.end.or(form_params.end).unwrap_or_default(),
355        step: params.step.or(form_params.step).unwrap_or_default(),
356        lookback: params
357            .lookback
358            .or(form_params.lookback)
359            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
360        alias: None,
361    };
362
363    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
364
365    // update catalog and schema in query context if necessary
366    if let Some(db) = &params.db {
367        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
368        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
369    }
370    let query_ctx = Arc::new(query_ctx);
371    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
372        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
373        .start_timer();
374
375    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
376        && !name_matchers.is_empty()
377    {
378        debug!("Find metric name matchers: {:?}", name_matchers);
379
380        let metric_names =
381            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
382
383        debug!("Find metric names: {:?}", metric_names);
384
385        if metric_names.is_empty() {
386            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
387                result_type: ValueType::Matrix.to_string(),
388                ..Default::default()
389            }));
390        }
391
392        let responses = join_all(metric_names.into_iter().map(|metric| {
393            let mut prom_query = prom_query.clone();
394            let mut promql_expr = promql_expr.clone();
395            let query_ctx = query_ctx.clone();
396            let handler = handler.clone();
397
398            async move {
399                update_metric_name_matcher(&mut promql_expr, &metric);
400                let new_query = promql_expr.to_string();
401                debug!(
402                    "Updated promql, before: {}, after: {}",
403                    &prom_query.query, new_query
404                );
405                prom_query.query = new_query;
406
407                do_range_query(&handler, &prom_query, query_ctx).await
408            }
409        }))
410        .await;
411
412        // Safety: at least one responses, checked above
413        responses
414            .into_iter()
415            .reduce(|mut acc, resp| {
416                acc.data.append(resp.data);
417                acc
418            })
419            .unwrap()
420    } else {
421        do_range_query(&handler, &prom_query, query_ctx).await
422    }
423}
424
425/// Executes a single range query and returns response
426async fn do_range_query(
427    handler: &PrometheusHandlerRef,
428    prom_query: &PromQuery,
429    query_ctx: QueryContextRef,
430) -> PrometheusJsonResponse {
431    let result = handler.do_query(prom_query, query_ctx).await;
432    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
433        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
434        Ok((metric_name, _)) => metric_name,
435    };
436    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
437}
438
439#[derive(Debug, Default, Serialize)]
440struct Matches(Vec<String>);
441
442#[derive(Debug, Default, Serialize, Deserialize)]
443pub struct LabelsQuery {
444    start: Option<String>,
445    end: Option<String>,
446    lookback: Option<String>,
447    #[serde(flatten)]
448    matches: Matches,
449    db: Option<String>,
450}
451
452// Custom Deserialize method to support parsing repeated match[]
453impl<'de> Deserialize<'de> for Matches {
454    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
455    where
456        D: de::Deserializer<'de>,
457    {
458        struct MatchesVisitor;
459
460        impl<'d> Visitor<'d> for MatchesVisitor {
461            type Value = Vec<String>;
462
463            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
464                formatter.write_str("a string")
465            }
466
467            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
468            where
469                M: MapAccess<'d>,
470            {
471                let mut matches = Vec::new();
472                while let Some((key, value)) = access.next_entry::<String, String>()? {
473                    if key == "match[]" {
474                        matches.push(value);
475                    }
476                }
477                Ok(matches)
478            }
479        }
480        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
481    }
482}
483
484/// Handles schema errors, transforming a Result into an Option.
485/// - If the input is `Ok(v)`, returns `Some(v)`
486/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
487///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
488/// - If the input is `Err(err)` with any other error code, directly returns a
489///   `PrometheusJsonResponse::error`.
490macro_rules! handle_schema_err {
491    ($result:expr) => {
492        match $result {
493            Ok(v) => Some(v),
494            Err(err) => {
495                if err.status_code() == StatusCode::TableNotFound
496                    || err.status_code() == StatusCode::TableColumnNotFound
497                {
498                    // Prometheus won't report error if querying nonexist label and metric
499                    None
500                } else {
501                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
502                }
503            }
504        }
505    };
506}
507
508#[axum_macros::debug_handler]
509#[tracing::instrument(
510    skip_all,
511    fields(protocol = "prometheus", request_type = "labels_query")
512)]
513pub async fn labels_query(
514    State(handler): State<PrometheusHandlerRef>,
515    Query(params): Query<LabelsQuery>,
516    Extension(mut query_ctx): Extension<QueryContext>,
517    Form(form_params): Form<LabelsQuery>,
518) -> PrometheusJsonResponse {
519    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
520    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
521    let query_ctx = Arc::new(query_ctx);
522
523    let mut queries = params.matches.0;
524    if queries.is_empty() {
525        queries = form_params.matches.0;
526    }
527
528    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
529        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
530        .start_timer();
531
532    // Fetch all tag columns. It will be used as white-list for tag names.
533    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
534    {
535        Ok(labels) => labels,
536        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
537    };
538    // insert the special metric name label
539    let _ = labels.insert(METRIC_NAME.to_string());
540
541    // Fetch all columns if no query matcher is provided
542    if queries.is_empty() {
543        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
544        labels_vec.sort_unstable();
545        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
546    }
547
548    // Otherwise, run queries and extract column name from result set.
549    let start = params
550        .start
551        .or(form_params.start)
552        .unwrap_or_else(yesterday_rfc3339);
553    let end = params
554        .end
555        .or(form_params.end)
556        .unwrap_or_else(current_time_rfc3339);
557    let lookback = params
558        .lookback
559        .or(form_params.lookback)
560        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
561
562    let mut fetched_labels = HashSet::new();
563    let _ = fetched_labels.insert(METRIC_NAME.to_string());
564
565    let mut merge_map = HashMap::new();
566    for query in queries {
567        let prom_query = PromQuery {
568            query,
569            start: start.clone(),
570            end: end.clone(),
571            step: DEFAULT_LOOKBACK_STRING.to_string(),
572            lookback: lookback.clone(),
573            alias: None,
574        };
575
576        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
577        handle_schema_err!(
578            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
579                .await
580        );
581    }
582
583    // intersect `fetched_labels` with `labels` to filter out non-tag columns
584    fetched_labels.retain(|l| labels.contains(l));
585    let _ = labels.insert(METRIC_NAME.to_string());
586
587    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
588    sorted_labels.sort();
589    let merge_map = merge_map
590        .into_iter()
591        .map(|(k, v)| (k, Value::from(v)))
592        .collect();
593    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
594    resp.resp_metrics = merge_map;
595    resp
596}
597
598/// Get all tag column name of the given schema
599async fn get_all_column_names(
600    catalog: &str,
601    schema: &str,
602    manager: &CatalogManagerRef,
603) -> std::result::Result<HashSet<String>, catalog::error::Error> {
604    let table_names = manager.table_names(catalog, schema, None).await?;
605
606    let mut labels = HashSet::new();
607    for table_name in table_names {
608        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
609            continue;
610        };
611        for column in table.primary_key_columns() {
612            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
613                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
614            {
615                labels.insert(column.name);
616            }
617        }
618    }
619
620    Ok(labels)
621}
622
623async fn retrieve_series_from_query_result(
624    result: Result<Output>,
625    series: &mut Vec<HashMap<String, String>>,
626    query_ctx: &QueryContext,
627    table_name: &str,
628    manager: &CatalogManagerRef,
629    metrics: &mut HashMap<String, u64>,
630) -> Result<()> {
631    let result = result?;
632
633    // fetch tag list
634    let table = manager
635        .table(
636            query_ctx.current_catalog(),
637            &query_ctx.current_schema(),
638            table_name,
639            Some(query_ctx),
640        )
641        .await
642        .context(CatalogSnafu)?
643        .with_context(|| TableNotFoundSnafu {
644            catalog: query_ctx.current_catalog(),
645            schema: query_ctx.current_schema(),
646            table: table_name,
647        })?;
648    let tag_columns = table
649        .primary_key_columns()
650        .map(|c| c.name)
651        .collect::<HashSet<_>>();
652
653    match result.data {
654        OutputData::RecordBatches(batches) => {
655            record_batches_to_series(batches, series, table_name, &tag_columns)
656        }
657        OutputData::Stream(stream) => {
658            let batches = RecordBatches::try_collect(stream)
659                .await
660                .context(CollectRecordbatchSnafu)?;
661            record_batches_to_series(batches, series, table_name, &tag_columns)
662        }
663        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
664            reason: "expected data result, but got affected rows".to_string(),
665            location: Location::default(),
666        }),
667    }?;
668
669    if let Some(ref plan) = result.meta.plan {
670        collect_plan_metrics(plan, &mut [metrics]);
671    }
672    Ok(())
673}
674
675/// Retrieve labels name from query result
676async fn retrieve_labels_name_from_query_result(
677    result: Result<Output>,
678    labels: &mut HashSet<String>,
679    metrics: &mut HashMap<String, u64>,
680) -> Result<()> {
681    let result = result?;
682    match result.data {
683        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
684        OutputData::Stream(stream) => {
685            let batches = RecordBatches::try_collect(stream)
686                .await
687                .context(CollectRecordbatchSnafu)?;
688            record_batches_to_labels_name(batches, labels)
689        }
690        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
691            reason: "expected data result, but got affected rows".to_string(),
692        }
693        .fail(),
694    }?;
695    if let Some(ref plan) = result.meta.plan {
696        collect_plan_metrics(plan, &mut [metrics]);
697    }
698    Ok(())
699}
700
701fn record_batches_to_series(
702    batches: RecordBatches,
703    series: &mut Vec<HashMap<String, String>>,
704    table_name: &str,
705    tag_columns: &HashSet<String>,
706) -> Result<()> {
707    for batch in batches.iter() {
708        // project record batch to only contains tag columns
709        let projection = batch
710            .schema
711            .column_schemas()
712            .iter()
713            .enumerate()
714            .filter_map(|(idx, col)| {
715                if tag_columns.contains(&col.name) {
716                    Some(idx)
717                } else {
718                    None
719                }
720            })
721            .collect::<Vec<_>>();
722        let batch = batch
723            .try_project(&projection)
724            .context(CollectRecordbatchSnafu)?;
725
726        for row in batch.rows() {
727            let mut element: HashMap<String, String> = row
728                .iter()
729                .enumerate()
730                .map(|(idx, column)| {
731                    let column_name = batch.schema.column_name_by_index(idx);
732                    (column_name.to_string(), column.to_string())
733                })
734                .collect();
735            let _ = element.insert("__name__".to_string(), table_name.to_string());
736            series.push(element);
737        }
738    }
739    Ok(())
740}
741
742/// Retrieve labels name from record batches
743fn record_batches_to_labels_name(
744    batches: RecordBatches,
745    labels: &mut HashSet<String>,
746) -> Result<()> {
747    let mut column_indices = Vec::new();
748    let mut field_column_indices = Vec::new();
749    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
750        if let ConcreteDataType::Float64(_) = column.data_type {
751            field_column_indices.push(i);
752        }
753        column_indices.push(i);
754    }
755
756    if field_column_indices.is_empty() {
757        return Err(Error::Internal {
758            err_msg: "no value column found".to_string(),
759        });
760    }
761
762    for batch in batches.iter() {
763        let names = column_indices
764            .iter()
765            .map(|c| batches.schema().column_name_by_index(*c).to_string())
766            .collect::<Vec<_>>();
767
768        let field_columns = field_column_indices
769            .iter()
770            .map(|i| {
771                batch
772                    .column(*i)
773                    .as_any()
774                    .downcast_ref::<Float64Vector>()
775                    .unwrap()
776            })
777            .collect::<Vec<_>>();
778
779        for row_index in 0..batch.num_rows() {
780            // if all field columns are null, skip this row
781            if field_columns
782                .iter()
783                .all(|c| c.get_data(row_index).is_none())
784            {
785                continue;
786            }
787
788            // if a field is not null, record the tag name and return
789            names.iter().for_each(|name| {
790                let _ = labels.insert(name.clone());
791            });
792            return Ok(());
793        }
794    }
795    Ok(())
796}
797
798pub(crate) fn retrieve_metric_name_and_result_type(
799    promql: &str,
800) -> Result<(Option<String>, ValueType)> {
801    let promql_expr = promql_parser::parser::parse(promql)
802        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
803    let metric_name = promql_expr_to_metric_name(&promql_expr);
804    let result_type = promql_expr.value_type();
805
806    Ok((metric_name, result_type))
807}
808
809/// Tries to get catalog and schema from an optional db param. And retrieves
810/// them from [QueryContext] if they don't present.
811pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
812    if let Some(db) = db {
813        parse_catalog_and_schema_from_db_string(db)
814    } else {
815        (
816            ctx.current_catalog().to_string(),
817            ctx.current_schema().clone(),
818        )
819    }
820}
821
822/// Update catalog and schema in [QueryContext] if necessary.
823pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
824    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
825        ctx.set_current_catalog(catalog);
826        ctx.set_current_schema(schema);
827    }
828}
829
830fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
831    let mut metric_names = HashSet::new();
832    collect_metric_names(expr, &mut metric_names);
833
834    // Return the metric name only if there's exactly one unique metric name
835    if metric_names.len() == 1 {
836        metric_names.into_iter().next()
837    } else {
838        None
839    }
840}
841
842/// Recursively collect all metric names from a PromQL expression
843fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
844    match expr {
845        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
846            match modifier {
847                Some(LabelModifier::Include(labels)) => {
848                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
849                        metric_names.clear();
850                        return;
851                    }
852                }
853                Some(LabelModifier::Exclude(labels)) => {
854                    if labels.labels.contains(&METRIC_NAME.to_string()) {
855                        metric_names.clear();
856                        return;
857                    }
858                }
859                _ => {}
860            }
861            collect_metric_names(expr, metric_names)
862        }
863        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
864        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
865            if matches!(
866                op.id(),
867                token::T_LAND // INTERSECT
868                    | token::T_LOR // UNION
869                    | token::T_LUNLESS // EXCEPT
870            ) {
871                collect_metric_names(lhs, metric_names)
872            } else {
873                metric_names.clear()
874            }
875        }
876        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
877        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
878        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
879            if let Some(name) = name {
880                metric_names.insert(name.clone());
881            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
882                metric_names.insert(matcher.value);
883            }
884        }
885        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
886            let VectorSelector { name, matchers, .. } = vs;
887            if let Some(name) = name {
888                metric_names.insert(name.clone());
889            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
890                metric_names.insert(matcher.value);
891            }
892        }
893        PromqlExpr::Call(Call { args, .. }) => {
894            args.args
895                .iter()
896                .for_each(|e| collect_metric_names(e, metric_names));
897        }
898        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
899    }
900}
901
902fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
903where
904    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
905{
906    match expr {
907        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
908        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
909        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
910            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
911        }
912        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
913        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
914        PromqlExpr::NumberLiteral(_) => None,
915        PromqlExpr::StringLiteral(_) => None,
916        PromqlExpr::Extension(_) => None,
917        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
918        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
919            let VectorSelector { name, matchers, .. } = vs;
920
921            f(name, matchers)
922        }
923        PromqlExpr::Call(Call { args, .. }) => args
924            .args
925            .iter()
926            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
927    }
928}
929
930/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
931fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
932    find_metric_name_and_matchers(expr, |name, matchers| {
933        // Has name, ignore the matchers
934        if name.is_some() {
935            return None;
936        }
937
938        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
939        Some(matchers.find_matchers(METRIC_NAME))
940    })
941    .map(|matchers| {
942        matchers
943            .into_iter()
944            .filter(|m| !matches!(m.op, MatchOp::Equal))
945            .map(normalize_matcher)
946            .collect::<Vec<_>>()
947    })
948}
949
950/// Update the `__name__` matchers in expression into special value
951/// Returns the updated expression.
952fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
953    match expr {
954        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
955            update_metric_name_matcher(expr, metric_name)
956        }
957        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
958        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
959            update_metric_name_matcher(lhs, metric_name);
960            update_metric_name_matcher(rhs, metric_name);
961        }
962        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
963        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
964            update_metric_name_matcher(expr, metric_name)
965        }
966        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
967            if name.is_some() {
968                return;
969            }
970
971            for m in &mut matchers.matchers {
972                if m.name == METRIC_NAME {
973                    m.op = MatchOp::Equal;
974                    m.value = metric_name.to_string();
975                }
976            }
977        }
978        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
979            let VectorSelector { name, matchers, .. } = vs;
980            if name.is_some() {
981                return;
982            }
983
984            for m in &mut matchers.matchers {
985                if m.name == METRIC_NAME {
986                    m.op = MatchOp::Equal;
987                    m.value = metric_name.to_string();
988                }
989            }
990        }
991        PromqlExpr::Call(Call { args, .. }) => {
992            args.args.iter_mut().for_each(|e| {
993                update_metric_name_matcher(e, metric_name);
994            });
995        }
996        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
997    }
998}
999
1000#[derive(Debug, Default, Serialize, Deserialize)]
1001pub struct LabelValueQuery {
1002    start: Option<String>,
1003    end: Option<String>,
1004    lookback: Option<String>,
1005    #[serde(flatten)]
1006    matches: Matches,
1007    db: Option<String>,
1008    limit: Option<usize>,
1009}
1010
1011#[axum_macros::debug_handler]
1012#[tracing::instrument(
1013    skip_all,
1014    fields(protocol = "prometheus", request_type = "label_values_query")
1015)]
1016pub async fn label_values_query(
1017    State(handler): State<PrometheusHandlerRef>,
1018    Path(label_name): Path<String>,
1019    Extension(mut query_ctx): Extension<QueryContext>,
1020    Query(params): Query<LabelValueQuery>,
1021) -> PrometheusJsonResponse {
1022    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1023    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1024    let query_ctx = Arc::new(query_ctx);
1025
1026    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1027        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1028        .start_timer();
1029
1030    if label_name == METRIC_NAME_LABEL {
1031        let catalog_manager = handler.catalog_manager();
1032
1033        let mut table_names = try_call_return_response!(
1034            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1035        );
1036
1037        truncate_results(&mut table_names, params.limit);
1038        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1039    } else if label_name == FIELD_NAME_LABEL {
1040        let field_columns = handle_schema_err!(
1041            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1042        )
1043        .unwrap_or_default();
1044        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1045        field_columns.sort_unstable();
1046        truncate_results(&mut field_columns, params.limit);
1047        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1048    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1049        let catalog_manager = handler.catalog_manager();
1050
1051        let mut schema_names = try_call_return_response!(
1052            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1053        );
1054        truncate_results(&mut schema_names, params.limit);
1055        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1056    }
1057
1058    let queries = params.matches.0;
1059    if queries.is_empty() {
1060        return PrometheusJsonResponse::error(
1061            StatusCode::InvalidArguments,
1062            "match[] parameter is required",
1063        );
1064    }
1065
1066    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1067    let end = params.end.unwrap_or_else(current_time_rfc3339);
1068    let mut label_values = HashSet::new();
1069
1070    let start = try_call_return_response!(
1071        QueryLanguageParser::parse_promql_timestamp(&start)
1072            .context(ParseTimestampSnafu { timestamp: &start })
1073    );
1074    let end = try_call_return_response!(
1075        QueryLanguageParser::parse_promql_timestamp(&end)
1076            .context(ParseTimestampSnafu { timestamp: &end })
1077    );
1078
1079    for query in queries {
1080        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1081        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1082            return PrometheusJsonResponse::error(
1083                StatusCode::InvalidArguments,
1084                "expected vector selector",
1085            );
1086        };
1087        let Some(name) = take_metric_name(&mut vector_selector) else {
1088            return PrometheusJsonResponse::error(
1089                StatusCode::InvalidArguments,
1090                "expected metric name",
1091            );
1092        };
1093        let VectorSelector { matchers, .. } = vector_selector;
1094        // Only use and filter matchers.
1095        let matchers = matchers.matchers;
1096        let result = handler
1097            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1098            .await;
1099        if let Some(result) = handle_schema_err!(result) {
1100            label_values.extend(result.into_iter());
1101        }
1102    }
1103
1104    let mut label_values: Vec<_> = label_values.into_iter().collect();
1105    label_values.sort_unstable();
1106    truncate_results(&mut label_values, params.limit);
1107
1108    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1109}
1110
1111fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1112    if let Some(limit) = limit
1113        && limit > 0
1114        && label_values.len() >= limit
1115    {
1116        label_values.truncate(limit);
1117    }
1118}
1119
1120/// Take metric name from the [VectorSelector].
1121/// It takes the name in the selector or removes the name matcher.
1122fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1123    if let Some(name) = selector.name.take() {
1124        return Some(name);
1125    }
1126
1127    let (pos, matcher) = selector
1128        .matchers
1129        .matchers
1130        .iter()
1131        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1132    let name = matcher.value.clone();
1133    // We need to remove the name matcher to avoid using it as a filter in query.
1134    selector.matchers.matchers.remove(pos);
1135
1136    Some(name)
1137}
1138
1139async fn retrieve_table_names(
1140    query_ctx: &QueryContext,
1141    catalog_manager: CatalogManagerRef,
1142    matches: Vec<String>,
1143) -> Result<Vec<String>> {
1144    let catalog = query_ctx.current_catalog();
1145    let schema = query_ctx.current_schema();
1146
1147    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1148    let mut table_names = Vec::new();
1149
1150    // we only provide very limited support for matcher against __name__
1151    let name_matcher = matches
1152        .first()
1153        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1154        .and_then(|expr| {
1155            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1156                let matchers = vector_selector.matchers.matchers;
1157                for matcher in matchers {
1158                    if matcher.name == METRIC_NAME_LABEL {
1159                        return Some(matcher);
1160                    }
1161                }
1162
1163                None
1164            } else {
1165                None
1166            }
1167        });
1168
1169    while let Some(table) = tables_stream.next().await {
1170        let table = table.context(CatalogSnafu)?;
1171        if !table
1172            .table_info()
1173            .meta
1174            .options
1175            .extra_options
1176            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1177        {
1178            // skip non-prometheus (non-metricengine) tables for __name__ query
1179            continue;
1180        }
1181
1182        let table_name = &table.table_info().name;
1183
1184        if let Some(matcher) = &name_matcher {
1185            match &matcher.op {
1186                MatchOp::Equal => {
1187                    if table_name == &matcher.value {
1188                        table_names.push(table_name.clone());
1189                    }
1190                }
1191                MatchOp::Re(reg) => {
1192                    if reg.is_match(table_name) {
1193                        table_names.push(table_name.clone());
1194                    }
1195                }
1196                _ => {
1197                    // != and !~ are not supported:
1198                    // vector must contains at least one non-empty matcher
1199                    table_names.push(table_name.clone());
1200                }
1201            }
1202        } else {
1203            table_names.push(table_name.clone());
1204        }
1205    }
1206
1207    table_names.sort_unstable();
1208    Ok(table_names)
1209}
1210
1211async fn retrieve_field_names(
1212    query_ctx: &QueryContext,
1213    manager: CatalogManagerRef,
1214    matches: Vec<String>,
1215) -> Result<HashSet<String>> {
1216    let mut field_columns = HashSet::new();
1217    let catalog = query_ctx.current_catalog();
1218    let schema = query_ctx.current_schema();
1219
1220    if matches.is_empty() {
1221        // query all tables if no matcher is provided
1222        while let Some(table) = manager
1223            .tables(catalog, &schema, Some(query_ctx))
1224            .next()
1225            .await
1226        {
1227            let table = table.context(CatalogSnafu)?;
1228            for column in table.field_columns() {
1229                field_columns.insert(column.name);
1230            }
1231        }
1232        return Ok(field_columns);
1233    }
1234
1235    for table_name in matches {
1236        let table = manager
1237            .table(catalog, &schema, &table_name, Some(query_ctx))
1238            .await
1239            .context(CatalogSnafu)?
1240            .with_context(|| TableNotFoundSnafu {
1241                catalog: catalog.to_string(),
1242                schema: schema.clone(),
1243                table: table_name.clone(),
1244            })?;
1245
1246        for column in table.field_columns() {
1247            field_columns.insert(column.name);
1248        }
1249    }
1250    Ok(field_columns)
1251}
1252
1253async fn retrieve_schema_names(
1254    query_ctx: &QueryContext,
1255    catalog_manager: CatalogManagerRef,
1256    matches: Vec<String>,
1257) -> Result<Vec<String>> {
1258    let mut schemas = Vec::new();
1259    let catalog = query_ctx.current_catalog();
1260
1261    let candidate_schemas = catalog_manager
1262        .schema_names(catalog, Some(query_ctx))
1263        .await
1264        .context(CatalogSnafu)?;
1265
1266    for schema in candidate_schemas {
1267        let mut found = true;
1268        for match_item in &matches {
1269            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1270                let exists = catalog_manager
1271                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1272                    .await
1273                    .context(CatalogSnafu)?;
1274                if !exists {
1275                    found = false;
1276                    break;
1277                }
1278            }
1279        }
1280
1281        if found {
1282            schemas.push(schema);
1283        }
1284    }
1285
1286    schemas.sort_unstable();
1287
1288    Ok(schemas)
1289}
1290
1291/// Try to parse and extract the name of referenced metric from the promql query.
1292///
1293/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1294/// Multiple references to the same metric are allowed.
1295fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1296    let promql_expr = promql_parser::parser::parse(query).ok()?;
1297    promql_expr_to_metric_name(&promql_expr)
1298}
1299
1300#[derive(Debug, Default, Serialize, Deserialize)]
1301pub struct SeriesQuery {
1302    start: Option<String>,
1303    end: Option<String>,
1304    lookback: Option<String>,
1305    #[serde(flatten)]
1306    matches: Matches,
1307    db: Option<String>,
1308}
1309
1310#[axum_macros::debug_handler]
1311#[tracing::instrument(
1312    skip_all,
1313    fields(protocol = "prometheus", request_type = "series_query")
1314)]
1315pub async fn series_query(
1316    State(handler): State<PrometheusHandlerRef>,
1317    Query(params): Query<SeriesQuery>,
1318    Extension(mut query_ctx): Extension<QueryContext>,
1319    Form(form_params): Form<SeriesQuery>,
1320) -> PrometheusJsonResponse {
1321    let mut queries: Vec<String> = params.matches.0;
1322    if queries.is_empty() {
1323        queries = form_params.matches.0;
1324    }
1325    if queries.is_empty() {
1326        return PrometheusJsonResponse::error(
1327            StatusCode::Unsupported,
1328            "match[] parameter is required",
1329        );
1330    }
1331    let start = params
1332        .start
1333        .or(form_params.start)
1334        .unwrap_or_else(yesterday_rfc3339);
1335    let end = params
1336        .end
1337        .or(form_params.end)
1338        .unwrap_or_else(current_time_rfc3339);
1339    let lookback = params
1340        .lookback
1341        .or(form_params.lookback)
1342        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1343
1344    // update catalog and schema in query context if necessary
1345    if let Some(db) = &params.db {
1346        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1347        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1348    }
1349    let query_ctx = Arc::new(query_ctx);
1350
1351    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1352        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1353        .start_timer();
1354
1355    let mut series = Vec::new();
1356    let mut merge_map = HashMap::new();
1357    for query in queries {
1358        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1359        let prom_query = PromQuery {
1360            query,
1361            start: start.clone(),
1362            end: end.clone(),
1363            // TODO: find a better value for step
1364            step: DEFAULT_LOOKBACK_STRING.to_string(),
1365            lookback: lookback.clone(),
1366            alias: None,
1367        };
1368        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1369
1370        handle_schema_err!(
1371            retrieve_series_from_query_result(
1372                result,
1373                &mut series,
1374                &query_ctx,
1375                &table_name,
1376                &handler.catalog_manager(),
1377                &mut merge_map,
1378            )
1379            .await
1380        );
1381    }
1382    let merge_map = merge_map
1383        .into_iter()
1384        .map(|(k, v)| (k, Value::from(v)))
1385        .collect();
1386    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1387    resp.resp_metrics = merge_map;
1388    resp
1389}
1390
1391#[derive(Debug, Default, Serialize, Deserialize)]
1392pub struct ParseQuery {
1393    query: Option<String>,
1394    db: Option<String>,
1395}
1396
1397#[axum_macros::debug_handler]
1398#[tracing::instrument(
1399    skip_all,
1400    fields(protocol = "prometheus", request_type = "parse_query")
1401)]
1402pub async fn parse_query(
1403    State(_handler): State<PrometheusHandlerRef>,
1404    Query(params): Query<ParseQuery>,
1405    Extension(_query_ctx): Extension<QueryContext>,
1406    Form(form_params): Form<ParseQuery>,
1407) -> PrometheusJsonResponse {
1408    if let Some(query) = params.query.or(form_params.query) {
1409        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1410        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1411    } else {
1412        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1413    }
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418    use promql_parser::parser::value::ValueType;
1419
1420    use super::*;
1421
1422    struct TestCase {
1423        name: &'static str,
1424        promql: &'static str,
1425        expected_metric: Option<&'static str>,
1426        expected_type: ValueType,
1427        should_error: bool,
1428    }
1429
1430    #[test]
1431    fn test_retrieve_metric_name_and_result_type() {
1432        let test_cases = &[
1433            // Single metric cases
1434            TestCase {
1435                name: "simple metric",
1436                promql: "cpu_usage",
1437                expected_metric: Some("cpu_usage"),
1438                expected_type: ValueType::Vector,
1439                should_error: false,
1440            },
1441            TestCase {
1442                name: "metric with selector",
1443                promql: r#"cpu_usage{instance="localhost"}"#,
1444                expected_metric: Some("cpu_usage"),
1445                expected_type: ValueType::Vector,
1446                should_error: false,
1447            },
1448            TestCase {
1449                name: "metric with range selector",
1450                promql: "cpu_usage[5m]",
1451                expected_metric: Some("cpu_usage"),
1452                expected_type: ValueType::Matrix,
1453                should_error: false,
1454            },
1455            TestCase {
1456                name: "metric with __name__ matcher",
1457                promql: r#"{__name__="cpu_usage"}"#,
1458                expected_metric: Some("cpu_usage"),
1459                expected_type: ValueType::Vector,
1460                should_error: false,
1461            },
1462            TestCase {
1463                name: "metric with unary operator",
1464                promql: "-cpu_usage",
1465                expected_metric: None,
1466                expected_type: ValueType::Vector,
1467                should_error: false,
1468            },
1469            // Aggregation and function cases
1470            TestCase {
1471                name: "metric with aggregation",
1472                promql: "sum(cpu_usage)",
1473                expected_metric: Some("cpu_usage"),
1474                expected_type: ValueType::Vector,
1475                should_error: false,
1476            },
1477            TestCase {
1478                name: "complex aggregation",
1479                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1480                expected_metric: None,
1481                expected_type: ValueType::Vector,
1482                should_error: false,
1483            },
1484            TestCase {
1485                name: "complex aggregation",
1486                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1487                expected_metric: Some("cpu_usage"),
1488                expected_type: ValueType::Vector,
1489                should_error: false,
1490            },
1491            TestCase {
1492                name: "complex aggregation",
1493                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1494                expected_metric: Some("cpu_usage"),
1495                expected_type: ValueType::Vector,
1496                should_error: false,
1497            },
1498            // Same metric binary operations
1499            TestCase {
1500                name: "same metric addition",
1501                promql: "cpu_usage + cpu_usage",
1502                expected_metric: None,
1503                expected_type: ValueType::Vector,
1504                should_error: false,
1505            },
1506            TestCase {
1507                name: "metric with scalar addition",
1508                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1509                expected_metric: None,
1510                expected_type: ValueType::Vector,
1511                should_error: false,
1512            },
1513            // Multiple metrics cases
1514            TestCase {
1515                name: "different metrics addition",
1516                promql: "cpu_usage + memory_usage",
1517                expected_metric: None,
1518                expected_type: ValueType::Vector,
1519                should_error: false,
1520            },
1521            TestCase {
1522                name: "different metrics subtraction",
1523                promql: "network_in - network_out",
1524                expected_metric: None,
1525                expected_type: ValueType::Vector,
1526                should_error: false,
1527            },
1528            // Unless operator cases
1529            TestCase {
1530                name: "unless with different metrics",
1531                promql: "cpu_usage unless memory_usage",
1532                expected_metric: Some("cpu_usage"),
1533                expected_type: ValueType::Vector,
1534                should_error: false,
1535            },
1536            TestCase {
1537                name: "unless with same metric",
1538                promql: "cpu_usage unless cpu_usage",
1539                expected_metric: Some("cpu_usage"),
1540                expected_type: ValueType::Vector,
1541                should_error: false,
1542            },
1543            // Subquery cases
1544            TestCase {
1545                name: "basic subquery",
1546                promql: "cpu_usage[5m:1m]",
1547                expected_metric: Some("cpu_usage"),
1548                expected_type: ValueType::Matrix,
1549                should_error: false,
1550            },
1551            TestCase {
1552                name: "subquery with multiple metrics",
1553                promql: "(cpu_usage + memory_usage)[5m:1m]",
1554                expected_metric: None,
1555                expected_type: ValueType::Matrix,
1556                should_error: false,
1557            },
1558            // Literal values
1559            TestCase {
1560                name: "scalar value",
1561                promql: "42",
1562                expected_metric: None,
1563                expected_type: ValueType::Scalar,
1564                should_error: false,
1565            },
1566            TestCase {
1567                name: "string literal",
1568                promql: r#""hello world""#,
1569                expected_metric: None,
1570                expected_type: ValueType::String,
1571                should_error: false,
1572            },
1573            // Error cases
1574            TestCase {
1575                name: "invalid syntax",
1576                promql: "cpu_usage{invalid=",
1577                expected_metric: None,
1578                expected_type: ValueType::Vector,
1579                should_error: true,
1580            },
1581            TestCase {
1582                name: "empty query",
1583                promql: "",
1584                expected_metric: None,
1585                expected_type: ValueType::Vector,
1586                should_error: true,
1587            },
1588            TestCase {
1589                name: "malformed brackets",
1590                promql: "cpu_usage[5m",
1591                expected_metric: None,
1592                expected_type: ValueType::Vector,
1593                should_error: true,
1594            },
1595        ];
1596
1597        for test_case in test_cases {
1598            let result = retrieve_metric_name_and_result_type(test_case.promql);
1599
1600            if test_case.should_error {
1601                assert!(
1602                    result.is_err(),
1603                    "Test '{}' should have failed but succeeded with: {:?}",
1604                    test_case.name,
1605                    result
1606                );
1607            } else {
1608                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1609                    panic!(
1610                        "Test '{}' should have succeeded but failed with error: {}",
1611                        test_case.name, e
1612                    )
1613                });
1614
1615                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1616                assert_eq!(
1617                    metric_name, expected_metric_name,
1618                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1619                    test_case.name, expected_metric_name, metric_name
1620                );
1621
1622                assert_eq!(
1623                    value_type, test_case.expected_type,
1624                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1625                    test_case.name, test_case.expected_type, value_type
1626                );
1627            }
1628        }
1629    }
1630}