servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40    UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55    CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56    TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62/// For [ValueType::Vector] result type
63#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65    pub metric: BTreeMap<String, String>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub value: Option<(f64, String)>,
68}
69
70/// For [ValueType::Matrix] result type
71#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73    pub metric: BTreeMap<String, String>,
74    pub values: Vec<(f64, String)>,
75}
76
77/// Variants corresponding to [ValueType]
78#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81    Matrix(Vec<PromSeriesMatrix>),
82    Vector(Vec<PromSeriesVector>),
83    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88    fn default() -> Self {
89        PromQueryResult::Matrix(Default::default())
90    }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95    #[serde(rename = "resultType")]
96    pub result_type: String,
97    pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103    PromData(PromData),
104    Labels(Vec<String>),
105    Series(Vec<HashMap<String, String>>),
106    LabelValues(Vec<String>),
107    FormatQuery(String),
108    BuildInfo(OwnedBuildInfo),
109    #[serde(skip_deserializing)]
110    ParseResult(promql_parser::parser::Expr),
111    #[default]
112    None,
113}
114
115impl PrometheusResponse {
116    /// Append the other [`PrometheusResponse]`.
117    /// # NOTE
118    ///   Only append matrix and vector results, otherwise just ignore the other response.
119    fn append(&mut self, other: PrometheusResponse) {
120        match (self, other) {
121            (
122                PrometheusResponse::PromData(PromData {
123                    result: PromQueryResult::Matrix(lhs),
124                    ..
125                }),
126                PrometheusResponse::PromData(PromData {
127                    result: PromQueryResult::Matrix(rhs),
128                    ..
129                }),
130            ) => {
131                lhs.extend(rhs);
132            }
133
134            (
135                PrometheusResponse::PromData(PromData {
136                    result: PromQueryResult::Vector(lhs),
137                    ..
138                }),
139                PrometheusResponse::PromData(PromData {
140                    result: PromQueryResult::Vector(rhs),
141                    ..
142                }),
143            ) => {
144                lhs.extend(rhs);
145            }
146            _ => {
147                // TODO(dennis): process other cases?
148            }
149        }
150    }
151
152    pub fn is_none(&self) -> bool {
153        matches!(self, PrometheusResponse::None)
154    }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159    query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164    skip_all,
165    fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168    State(_handler): State<PrometheusHandlerRef>,
169    Query(params): Query<InstantQuery>,
170    Extension(_query_ctx): Extension<QueryContext>,
171    Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173    let query = params.query.or(form_params.query).unwrap_or_default();
174    match promql_parser::parser::parse(&query) {
175        Ok(expr) => {
176            let pretty = expr.prettify();
177            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178        }
179        Err(reason) => {
180            let err = InvalidQuerySnafu { reason }.build();
181            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182        }
183    }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191    skip_all,
192    fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195    let build_info = common_version::build_info().clone();
196    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201    query: Option<String>,
202    lookback: Option<String>,
203    time: Option<String>,
204    timeout: Option<String>,
205    db: Option<String>,
206}
207
208/// Helper macro which try to evaluate the expression and return its results.
209/// If the evaluation fails, return a `PrometheusJsonResponse` early.
210macro_rules! try_call_return_response {
211    ($handle: expr) => {
212        match $handle {
213            Ok(res) => res,
214            Err(err) => {
215                let msg = err.to_string();
216                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217            }
218        }
219    };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224    skip_all,
225    fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228    State(handler): State<PrometheusHandlerRef>,
229    Query(params): Query<InstantQuery>,
230    Extension(mut query_ctx): Extension<QueryContext>,
231    Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233    // Extract time from query string, or use current server time if not specified.
234    let time = params
235        .time
236        .or(form_params.time)
237        .unwrap_or_else(current_time_rfc3339);
238    let prom_query = PromQuery {
239        query: params.query.or(form_params.query).unwrap_or_default(),
240        start: time.clone(),
241        end: time,
242        step: "1s".to_string(),
243        lookback: params
244            .lookback
245            .or(form_params.lookback)
246            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247    };
248
249    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251    // update catalog and schema in query context if necessary
252    if let Some(db) = &params.db {
253        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255    }
256    let query_ctx = Arc::new(query_ctx);
257
258    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260        .start_timer();
261
262    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263        && !name_matchers.is_empty()
264    {
265        debug!("Find metric name matchers: {:?}", name_matchers);
266
267        let metric_names =
268            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270        debug!("Find metric names: {:?}", metric_names);
271
272        if metric_names.is_empty() {
273            let result_type = promql_expr.value_type();
274
275            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276                result_type: result_type.to_string(),
277                ..Default::default()
278            }));
279        }
280
281        let responses = join_all(metric_names.into_iter().map(|metric| {
282            let mut prom_query = prom_query.clone();
283            let mut promql_expr = promql_expr.clone();
284            let query_ctx = query_ctx.clone();
285            let handler = handler.clone();
286
287            async move {
288                update_metric_name_matcher(&mut promql_expr, &metric);
289                let new_query = promql_expr.to_string();
290                debug!(
291                    "Updated promql, before: {}, after: {}",
292                    &prom_query.query, new_query
293                );
294                prom_query.query = new_query;
295
296                do_instant_query(&handler, &prom_query, query_ctx).await
297            }
298        }))
299        .await;
300
301        responses
302            .into_iter()
303            .reduce(|mut acc, resp| {
304                acc.data.append(resp.data);
305                acc
306            })
307            .unwrap()
308    } else {
309        do_instant_query(&handler, &prom_query, query_ctx).await
310    }
311}
312
313/// Executes a single instant query and returns response
314async fn do_instant_query(
315    handler: &PrometheusHandlerRef,
316    prom_query: &PromQuery,
317    query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319    let result = handler.do_query(prom_query, query_ctx).await;
320    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321        Ok((metric_name, result_type)) => (metric_name, result_type),
322        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323    };
324    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329    query: Option<String>,
330    start: Option<String>,
331    end: Option<String>,
332    step: Option<String>,
333    lookback: Option<String>,
334    timeout: Option<String>,
335    db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340    skip_all,
341    fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344    State(handler): State<PrometheusHandlerRef>,
345    Query(params): Query<RangeQuery>,
346    Extension(mut query_ctx): Extension<QueryContext>,
347    Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349    let prom_query = PromQuery {
350        query: params.query.or(form_params.query).unwrap_or_default(),
351        start: params.start.or(form_params.start).unwrap_or_default(),
352        end: params.end.or(form_params.end).unwrap_or_default(),
353        step: params.step.or(form_params.step).unwrap_or_default(),
354        lookback: params
355            .lookback
356            .or(form_params.lookback)
357            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358    };
359
360    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362    // update catalog and schema in query context if necessary
363    if let Some(db) = &params.db {
364        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366    }
367    let query_ctx = Arc::new(query_ctx);
368    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370        .start_timer();
371
372    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373        && !name_matchers.is_empty()
374    {
375        debug!("Find metric name matchers: {:?}", name_matchers);
376
377        let metric_names =
378            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380        debug!("Find metric names: {:?}", metric_names);
381
382        if metric_names.is_empty() {
383            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384                result_type: ValueType::Matrix.to_string(),
385                ..Default::default()
386            }));
387        }
388
389        let responses = join_all(metric_names.into_iter().map(|metric| {
390            let mut prom_query = prom_query.clone();
391            let mut promql_expr = promql_expr.clone();
392            let query_ctx = query_ctx.clone();
393            let handler = handler.clone();
394
395            async move {
396                update_metric_name_matcher(&mut promql_expr, &metric);
397                let new_query = promql_expr.to_string();
398                debug!(
399                    "Updated promql, before: {}, after: {}",
400                    &prom_query.query, new_query
401                );
402                prom_query.query = new_query;
403
404                do_range_query(&handler, &prom_query, query_ctx).await
405            }
406        }))
407        .await;
408
409        // Safety: at least one responses, checked above
410        responses
411            .into_iter()
412            .reduce(|mut acc, resp| {
413                acc.data.append(resp.data);
414                acc
415            })
416            .unwrap()
417    } else {
418        do_range_query(&handler, &prom_query, query_ctx).await
419    }
420}
421
422/// Executes a single range query and returns response
423async fn do_range_query(
424    handler: &PrometheusHandlerRef,
425    prom_query: &PromQuery,
426    query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428    let result = handler.do_query(prom_query, query_ctx).await;
429    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431        Ok((metric_name, _)) => metric_name,
432    };
433    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441    start: Option<String>,
442    end: Option<String>,
443    lookback: Option<String>,
444    #[serde(flatten)]
445    matches: Matches,
446    db: Option<String>,
447}
448
449// Custom Deserialize method to support parsing repeated match[]
450impl<'de> Deserialize<'de> for Matches {
451    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452    where
453        D: de::Deserializer<'de>,
454    {
455        struct MatchesVisitor;
456
457        impl<'d> Visitor<'d> for MatchesVisitor {
458            type Value = Vec<String>;
459
460            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461                formatter.write_str("a string")
462            }
463
464            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465            where
466                M: MapAccess<'d>,
467            {
468                let mut matches = Vec::new();
469                while let Some((key, value)) = access.next_entry::<String, String>()? {
470                    if key == "match[]" {
471                        matches.push(value);
472                    }
473                }
474                Ok(matches)
475            }
476        }
477        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478    }
479}
480
481/// Handles schema errors, transforming a Result into an Option.
482/// - If the input is `Ok(v)`, returns `Some(v)`
483/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
484///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
485/// - If the input is `Err(err)` with any other error code, directly returns a
486///   `PrometheusJsonResponse::error`.
487macro_rules! handle_schema_err {
488    ($result:expr) => {
489        match $result {
490            Ok(v) => Some(v),
491            Err(err) => {
492                if err.status_code() == StatusCode::TableNotFound
493                    || err.status_code() == StatusCode::TableColumnNotFound
494                {
495                    // Prometheus won't report error if querying nonexist label and metric
496                    None
497                } else {
498                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
499                }
500            }
501        }
502    };
503}
504
505#[axum_macros::debug_handler]
506#[tracing::instrument(
507    skip_all,
508    fields(protocol = "prometheus", request_type = "labels_query")
509)]
510pub async fn labels_query(
511    State(handler): State<PrometheusHandlerRef>,
512    Query(params): Query<LabelsQuery>,
513    Extension(mut query_ctx): Extension<QueryContext>,
514    Form(form_params): Form<LabelsQuery>,
515) -> PrometheusJsonResponse {
516    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
517    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
518    let query_ctx = Arc::new(query_ctx);
519
520    let mut queries = params.matches.0;
521    if queries.is_empty() {
522        queries = form_params.matches.0;
523    }
524
525    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
526        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
527        .start_timer();
528
529    // Fetch all tag columns. It will be used as white-list for tag names.
530    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
531    {
532        Ok(labels) => labels,
533        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
534    };
535    // insert the special metric name label
536    let _ = labels.insert(METRIC_NAME.to_string());
537
538    // Fetch all columns if no query matcher is provided
539    if queries.is_empty() {
540        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
541        labels_vec.sort_unstable();
542        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
543    }
544
545    // Otherwise, run queries and extract column name from result set.
546    let start = params
547        .start
548        .or(form_params.start)
549        .unwrap_or_else(yesterday_rfc3339);
550    let end = params
551        .end
552        .or(form_params.end)
553        .unwrap_or_else(current_time_rfc3339);
554    let lookback = params
555        .lookback
556        .or(form_params.lookback)
557        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
558
559    let mut fetched_labels = HashSet::new();
560    let _ = fetched_labels.insert(METRIC_NAME.to_string());
561
562    let mut merge_map = HashMap::new();
563    for query in queries {
564        let prom_query = PromQuery {
565            query,
566            start: start.clone(),
567            end: end.clone(),
568            step: DEFAULT_LOOKBACK_STRING.to_string(),
569            lookback: lookback.clone(),
570        };
571
572        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
573        handle_schema_err!(
574            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
575                .await
576        );
577    }
578
579    // intersect `fetched_labels` with `labels` to filter out non-tag columns
580    fetched_labels.retain(|l| labels.contains(l));
581    let _ = labels.insert(METRIC_NAME.to_string());
582
583    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
584    sorted_labels.sort();
585    let merge_map = merge_map
586        .into_iter()
587        .map(|(k, v)| (k, Value::from(v)))
588        .collect();
589    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
590    resp.resp_metrics = merge_map;
591    resp
592}
593
594/// Get all tag column name of the given schema
595async fn get_all_column_names(
596    catalog: &str,
597    schema: &str,
598    manager: &CatalogManagerRef,
599) -> std::result::Result<HashSet<String>, catalog::error::Error> {
600    let table_names = manager.table_names(catalog, schema, None).await?;
601
602    let mut labels = HashSet::new();
603    for table_name in table_names {
604        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
605            continue;
606        };
607        for column in table.primary_key_columns() {
608            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
609                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
610            {
611                labels.insert(column.name);
612            }
613        }
614    }
615
616    Ok(labels)
617}
618
619async fn retrieve_series_from_query_result(
620    result: Result<Output>,
621    series: &mut Vec<HashMap<String, String>>,
622    query_ctx: &QueryContext,
623    table_name: &str,
624    manager: &CatalogManagerRef,
625    metrics: &mut HashMap<String, u64>,
626) -> Result<()> {
627    let result = result?;
628
629    // fetch tag list
630    let table = manager
631        .table(
632            query_ctx.current_catalog(),
633            &query_ctx.current_schema(),
634            table_name,
635            Some(query_ctx),
636        )
637        .await
638        .context(CatalogSnafu)?
639        .with_context(|| TableNotFoundSnafu {
640            catalog: query_ctx.current_catalog(),
641            schema: query_ctx.current_schema(),
642            table: table_name,
643        })?;
644    let tag_columns = table
645        .primary_key_columns()
646        .map(|c| c.name)
647        .collect::<HashSet<_>>();
648
649    match result.data {
650        OutputData::RecordBatches(batches) => {
651            record_batches_to_series(batches, series, table_name, &tag_columns)
652        }
653        OutputData::Stream(stream) => {
654            let batches = RecordBatches::try_collect(stream)
655                .await
656                .context(CollectRecordbatchSnafu)?;
657            record_batches_to_series(batches, series, table_name, &tag_columns)
658        }
659        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
660            reason: "expected data result, but got affected rows".to_string(),
661            location: Location::default(),
662        }),
663    }?;
664
665    if let Some(ref plan) = result.meta.plan {
666        collect_plan_metrics(plan, &mut [metrics]);
667    }
668    Ok(())
669}
670
671/// Retrieve labels name from query result
672async fn retrieve_labels_name_from_query_result(
673    result: Result<Output>,
674    labels: &mut HashSet<String>,
675    metrics: &mut HashMap<String, u64>,
676) -> Result<()> {
677    let result = result?;
678    match result.data {
679        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
680        OutputData::Stream(stream) => {
681            let batches = RecordBatches::try_collect(stream)
682                .await
683                .context(CollectRecordbatchSnafu)?;
684            record_batches_to_labels_name(batches, labels)
685        }
686        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
687            reason: "expected data result, but got affected rows".to_string(),
688        }
689        .fail(),
690    }?;
691    if let Some(ref plan) = result.meta.plan {
692        collect_plan_metrics(plan, &mut [metrics]);
693    }
694    Ok(())
695}
696
697fn record_batches_to_series(
698    batches: RecordBatches,
699    series: &mut Vec<HashMap<String, String>>,
700    table_name: &str,
701    tag_columns: &HashSet<String>,
702) -> Result<()> {
703    for batch in batches.iter() {
704        // project record batch to only contains tag columns
705        let projection = batch
706            .schema
707            .column_schemas()
708            .iter()
709            .enumerate()
710            .filter_map(|(idx, col)| {
711                if tag_columns.contains(&col.name) {
712                    Some(idx)
713                } else {
714                    None
715                }
716            })
717            .collect::<Vec<_>>();
718        let batch = batch
719            .try_project(&projection)
720            .context(CollectRecordbatchSnafu)?;
721
722        for row in batch.rows() {
723            let mut element: HashMap<String, String> = row
724                .iter()
725                .enumerate()
726                .map(|(idx, column)| {
727                    let column_name = batch.schema.column_name_by_index(idx);
728                    (column_name.to_string(), column.to_string())
729                })
730                .collect();
731            let _ = element.insert("__name__".to_string(), table_name.to_string());
732            series.push(element);
733        }
734    }
735    Ok(())
736}
737
738/// Retrieve labels name from record batches
739fn record_batches_to_labels_name(
740    batches: RecordBatches,
741    labels: &mut HashSet<String>,
742) -> Result<()> {
743    let mut column_indices = Vec::new();
744    let mut field_column_indices = Vec::new();
745    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
746        if let ConcreteDataType::Float64(_) = column.data_type {
747            field_column_indices.push(i);
748        }
749        column_indices.push(i);
750    }
751
752    if field_column_indices.is_empty() {
753        return Err(Error::Internal {
754            err_msg: "no value column found".to_string(),
755        });
756    }
757
758    for batch in batches.iter() {
759        let names = column_indices
760            .iter()
761            .map(|c| batches.schema().column_name_by_index(*c).to_string())
762            .collect::<Vec<_>>();
763
764        let field_columns = field_column_indices
765            .iter()
766            .map(|i| {
767                batch
768                    .column(*i)
769                    .as_any()
770                    .downcast_ref::<Float64Vector>()
771                    .unwrap()
772            })
773            .collect::<Vec<_>>();
774
775        for row_index in 0..batch.num_rows() {
776            // if all field columns are null, skip this row
777            if field_columns
778                .iter()
779                .all(|c| c.get_data(row_index).is_none())
780            {
781                continue;
782            }
783
784            // if a field is not null, record the tag name and return
785            names.iter().for_each(|name| {
786                let _ = labels.insert(name.to_string());
787            });
788            return Ok(());
789        }
790    }
791    Ok(())
792}
793
794pub(crate) fn retrieve_metric_name_and_result_type(
795    promql: &str,
796) -> Result<(Option<String>, ValueType)> {
797    let promql_expr = promql_parser::parser::parse(promql)
798        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
799    let metric_name = promql_expr_to_metric_name(&promql_expr);
800    let result_type = promql_expr.value_type();
801
802    Ok((metric_name, result_type))
803}
804
805/// Tries to get catalog and schema from an optional db param. And retrieves
806/// them from [QueryContext] if they don't present.
807pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
808    if let Some(db) = db {
809        parse_catalog_and_schema_from_db_string(db)
810    } else {
811        (
812            ctx.current_catalog().to_string(),
813            ctx.current_schema().to_string(),
814        )
815    }
816}
817
818/// Update catalog and schema in [QueryContext] if necessary.
819pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
820    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
821        ctx.set_current_catalog(catalog);
822        ctx.set_current_schema(schema);
823    }
824}
825
826fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
827    let mut metric_names = HashSet::new();
828    collect_metric_names(expr, &mut metric_names);
829
830    // Return the metric name only if there's exactly one unique metric name
831    if metric_names.len() == 1 {
832        metric_names.into_iter().next()
833    } else {
834        None
835    }
836}
837
838/// Recursively collect all metric names from a PromQL expression
839fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
840    match expr {
841        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
842            collect_metric_names(expr, metric_names)
843        }
844        PromqlExpr::Unary(UnaryExpr { expr }) => collect_metric_names(expr, metric_names),
845        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
846            collect_metric_names(lhs, metric_names);
847            collect_metric_names(rhs, metric_names);
848        }
849        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
850        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
851        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
852            if let Some(name) = name {
853                metric_names.insert(name.clone());
854            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
855                metric_names.insert(matcher.value);
856            }
857        }
858        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
859            let VectorSelector { name, matchers, .. } = vs;
860            if let Some(name) = name {
861                metric_names.insert(name.clone());
862            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
863                metric_names.insert(matcher.value);
864            }
865        }
866        PromqlExpr::Call(Call { args, .. }) => {
867            args.args
868                .iter()
869                .for_each(|e| collect_metric_names(e, metric_names));
870        }
871        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
872    }
873}
874
875fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
876where
877    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
878{
879    match expr {
880        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
881        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
882        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
883            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
884        }
885        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
886        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
887        PromqlExpr::NumberLiteral(_) => None,
888        PromqlExpr::StringLiteral(_) => None,
889        PromqlExpr::Extension(_) => None,
890        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
891        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
892            let VectorSelector { name, matchers, .. } = vs;
893
894            f(name, matchers)
895        }
896        PromqlExpr::Call(Call { args, .. }) => args
897            .args
898            .iter()
899            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
900    }
901}
902
903/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
904fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
905    find_metric_name_and_matchers(expr, |name, matchers| {
906        // Has name, ignore the matchers
907        if name.is_some() {
908            return None;
909        }
910
911        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
912        Some(matchers.find_matchers(METRIC_NAME))
913    })
914    .map(|matchers| {
915        matchers
916            .into_iter()
917            .filter(|m| !matches!(m.op, MatchOp::Equal))
918            .map(normalize_matcher)
919            .collect::<Vec<_>>()
920    })
921}
922
923/// Update the `__name__` matchers in expression into special value
924/// Returns the updated expression.
925fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
926    match expr {
927        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
928            update_metric_name_matcher(expr, metric_name)
929        }
930        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
931        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
932            update_metric_name_matcher(lhs, metric_name);
933            update_metric_name_matcher(rhs, metric_name);
934        }
935        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
936        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
937            update_metric_name_matcher(expr, metric_name)
938        }
939        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
940            if name.is_some() {
941                return;
942            }
943
944            for m in &mut matchers.matchers {
945                if m.name == METRIC_NAME {
946                    m.op = MatchOp::Equal;
947                    m.value = metric_name.to_string();
948                }
949            }
950        }
951        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
952            let VectorSelector { name, matchers, .. } = vs;
953            if name.is_some() {
954                return;
955            }
956
957            for m in &mut matchers.matchers {
958                if m.name == METRIC_NAME {
959                    m.op = MatchOp::Equal;
960                    m.value = metric_name.to_string();
961                }
962            }
963        }
964        PromqlExpr::Call(Call { args, .. }) => {
965            args.args.iter_mut().for_each(|e| {
966                update_metric_name_matcher(e, metric_name);
967            });
968        }
969        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
970    }
971}
972
973#[derive(Debug, Default, Serialize, Deserialize)]
974pub struct LabelValueQuery {
975    start: Option<String>,
976    end: Option<String>,
977    lookback: Option<String>,
978    #[serde(flatten)]
979    matches: Matches,
980    db: Option<String>,
981    limit: Option<usize>,
982}
983
984#[axum_macros::debug_handler]
985#[tracing::instrument(
986    skip_all,
987    fields(protocol = "prometheus", request_type = "label_values_query")
988)]
989pub async fn label_values_query(
990    State(handler): State<PrometheusHandlerRef>,
991    Path(label_name): Path<String>,
992    Extension(mut query_ctx): Extension<QueryContext>,
993    Query(params): Query<LabelValueQuery>,
994) -> PrometheusJsonResponse {
995    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
996    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
997    let query_ctx = Arc::new(query_ctx);
998
999    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1000        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1001        .start_timer();
1002
1003    if label_name == METRIC_NAME_LABEL {
1004        let catalog_manager = handler.catalog_manager();
1005        let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
1006        let mut table_names = Vec::new();
1007        while let Some(table) = tables_stream.next().await {
1008            // filter out physical tables
1009            match table {
1010                Ok(table) => {
1011                    if table
1012                        .table_info()
1013                        .meta
1014                        .options
1015                        .extra_options
1016                        .contains_key(PHYSICAL_TABLE_METADATA_KEY)
1017                    {
1018                        continue;
1019                    }
1020
1021                    table_names.push(table.table_info().name.clone());
1022                }
1023                Err(e) => {
1024                    return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
1025                }
1026            }
1027        }
1028        table_names.sort_unstable();
1029        truncate_results(&mut table_names, params.limit);
1030        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1031    } else if label_name == FIELD_NAME_LABEL {
1032        let field_columns = handle_schema_err!(
1033            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1034        )
1035        .unwrap_or_default();
1036        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1037        field_columns.sort_unstable();
1038        truncate_results(&mut field_columns, params.limit);
1039        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1040    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1041        let catalog_manager = handler.catalog_manager();
1042
1043        match retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await {
1044            Ok(mut schema_names) => {
1045                truncate_results(&mut schema_names, params.limit);
1046                return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
1047                    schema_names,
1048                ));
1049            }
1050            Err(e) => {
1051                return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
1052            }
1053        }
1054    }
1055
1056    let queries = params.matches.0;
1057    if queries.is_empty() {
1058        return PrometheusJsonResponse::error(
1059            StatusCode::InvalidArguments,
1060            "match[] parameter is required",
1061        );
1062    }
1063
1064    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1065    let end = params.end.unwrap_or_else(current_time_rfc3339);
1066    let mut label_values = HashSet::new();
1067
1068    let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1069        .context(ParseTimestampSnafu { timestamp: &start }));
1070    let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1071        .context(ParseTimestampSnafu { timestamp: &end }));
1072
1073    for query in queries {
1074        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1075        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1076            return PrometheusJsonResponse::error(
1077                StatusCode::InvalidArguments,
1078                "expected vector selector",
1079            );
1080        };
1081        let Some(name) = take_metric_name(&mut vector_selector) else {
1082            return PrometheusJsonResponse::error(
1083                StatusCode::InvalidArguments,
1084                "expected metric name",
1085            );
1086        };
1087        let VectorSelector { matchers, .. } = vector_selector;
1088        // Only use and filter matchers.
1089        let matchers = matchers.matchers;
1090        let result = handler
1091            .query_label_values(
1092                name,
1093                label_name.to_string(),
1094                matchers,
1095                start,
1096                end,
1097                &query_ctx,
1098            )
1099            .await;
1100        if let Some(result) = handle_schema_err!(result) {
1101            label_values.extend(result.into_iter());
1102        }
1103    }
1104
1105    let mut label_values: Vec<_> = label_values.into_iter().collect();
1106    label_values.sort_unstable();
1107    truncate_results(&mut label_values, params.limit);
1108
1109    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1110}
1111
1112fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1113    if let Some(limit) = limit {
1114        if limit > 0 && label_values.len() >= limit {
1115            label_values.truncate(limit);
1116        }
1117    }
1118}
1119
1120/// Take metric name from the [VectorSelector].
1121/// It takes the name in the selector or removes the name matcher.
1122fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1123    if let Some(name) = selector.name.take() {
1124        return Some(name);
1125    }
1126
1127    let (pos, matcher) = selector
1128        .matchers
1129        .matchers
1130        .iter()
1131        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1132    let name = matcher.value.clone();
1133    // We need to remove the name matcher to avoid using it as a filter in query.
1134    selector.matchers.matchers.remove(pos);
1135
1136    Some(name)
1137}
1138
1139async fn retrieve_field_names(
1140    query_ctx: &QueryContext,
1141    manager: CatalogManagerRef,
1142    matches: Vec<String>,
1143) -> Result<HashSet<String>> {
1144    let mut field_columns = HashSet::new();
1145    let catalog = query_ctx.current_catalog();
1146    let schema = query_ctx.current_schema();
1147
1148    if matches.is_empty() {
1149        // query all tables if no matcher is provided
1150        while let Some(table) = manager
1151            .tables(catalog, &schema, Some(query_ctx))
1152            .next()
1153            .await
1154        {
1155            let table = table.context(CatalogSnafu)?;
1156            for column in table.field_columns() {
1157                field_columns.insert(column.name);
1158            }
1159        }
1160        return Ok(field_columns);
1161    }
1162
1163    for table_name in matches {
1164        let table = manager
1165            .table(catalog, &schema, &table_name, Some(query_ctx))
1166            .await
1167            .context(CatalogSnafu)?
1168            .with_context(|| TableNotFoundSnafu {
1169                catalog: catalog.to_string(),
1170                schema: schema.to_string(),
1171                table: table_name.to_string(),
1172            })?;
1173
1174        for column in table.field_columns() {
1175            field_columns.insert(column.name);
1176        }
1177    }
1178    Ok(field_columns)
1179}
1180
1181async fn retrieve_schema_names(
1182    query_ctx: &QueryContext,
1183    catalog_manager: CatalogManagerRef,
1184    matches: Vec<String>,
1185) -> Result<Vec<String>> {
1186    let mut schemas = Vec::new();
1187    let catalog = query_ctx.current_catalog();
1188
1189    let candidate_schemas = catalog_manager
1190        .schema_names(catalog, Some(query_ctx))
1191        .await
1192        .context(CatalogSnafu)?;
1193
1194    for schema in candidate_schemas {
1195        let mut found = true;
1196        for match_item in &matches {
1197            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1198                let exists = catalog_manager
1199                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1200                    .await
1201                    .context(CatalogSnafu)?;
1202                if !exists {
1203                    found = false;
1204                    break;
1205                }
1206            }
1207        }
1208
1209        if found {
1210            schemas.push(schema);
1211        }
1212    }
1213
1214    schemas.sort_unstable();
1215
1216    Ok(schemas)
1217}
1218
1219/// Try to parse and extract the name of referenced metric from the promql query.
1220///
1221/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1222/// Multiple references to the same metric are allowed.
1223fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1224    let promql_expr = promql_parser::parser::parse(query).ok()?;
1225    promql_expr_to_metric_name(&promql_expr)
1226}
1227
1228#[derive(Debug, Default, Serialize, Deserialize)]
1229pub struct SeriesQuery {
1230    start: Option<String>,
1231    end: Option<String>,
1232    lookback: Option<String>,
1233    #[serde(flatten)]
1234    matches: Matches,
1235    db: Option<String>,
1236}
1237
1238#[axum_macros::debug_handler]
1239#[tracing::instrument(
1240    skip_all,
1241    fields(protocol = "prometheus", request_type = "series_query")
1242)]
1243pub async fn series_query(
1244    State(handler): State<PrometheusHandlerRef>,
1245    Query(params): Query<SeriesQuery>,
1246    Extension(mut query_ctx): Extension<QueryContext>,
1247    Form(form_params): Form<SeriesQuery>,
1248) -> PrometheusJsonResponse {
1249    let mut queries: Vec<String> = params.matches.0;
1250    if queries.is_empty() {
1251        queries = form_params.matches.0;
1252    }
1253    if queries.is_empty() {
1254        return PrometheusJsonResponse::error(
1255            StatusCode::Unsupported,
1256            "match[] parameter is required",
1257        );
1258    }
1259    let start = params
1260        .start
1261        .or(form_params.start)
1262        .unwrap_or_else(yesterday_rfc3339);
1263    let end = params
1264        .end
1265        .or(form_params.end)
1266        .unwrap_or_else(current_time_rfc3339);
1267    let lookback = params
1268        .lookback
1269        .or(form_params.lookback)
1270        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1271
1272    // update catalog and schema in query context if necessary
1273    if let Some(db) = &params.db {
1274        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1275        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1276    }
1277    let query_ctx = Arc::new(query_ctx);
1278
1279    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1280        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1281        .start_timer();
1282
1283    let mut series = Vec::new();
1284    let mut merge_map = HashMap::new();
1285    for query in queries {
1286        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1287        let prom_query = PromQuery {
1288            query,
1289            start: start.clone(),
1290            end: end.clone(),
1291            // TODO: find a better value for step
1292            step: DEFAULT_LOOKBACK_STRING.to_string(),
1293            lookback: lookback.clone(),
1294        };
1295        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1296
1297        handle_schema_err!(
1298            retrieve_series_from_query_result(
1299                result,
1300                &mut series,
1301                &query_ctx,
1302                &table_name,
1303                &handler.catalog_manager(),
1304                &mut merge_map,
1305            )
1306            .await
1307        );
1308    }
1309    let merge_map = merge_map
1310        .into_iter()
1311        .map(|(k, v)| (k, Value::from(v)))
1312        .collect();
1313    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1314    resp.resp_metrics = merge_map;
1315    resp
1316}
1317
1318#[derive(Debug, Default, Serialize, Deserialize)]
1319pub struct ParseQuery {
1320    query: Option<String>,
1321    db: Option<String>,
1322}
1323
1324#[axum_macros::debug_handler]
1325#[tracing::instrument(
1326    skip_all,
1327    fields(protocol = "prometheus", request_type = "parse_query")
1328)]
1329pub async fn parse_query(
1330    State(_handler): State<PrometheusHandlerRef>,
1331    Query(params): Query<ParseQuery>,
1332    Extension(_query_ctx): Extension<QueryContext>,
1333    Form(form_params): Form<ParseQuery>,
1334) -> PrometheusJsonResponse {
1335    if let Some(query) = params.query.or(form_params.query) {
1336        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1337        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1338    } else {
1339        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1340    }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345    use promql_parser::parser::value::ValueType;
1346
1347    use super::*;
1348
1349    struct TestCase {
1350        name: &'static str,
1351        promql: &'static str,
1352        expected_metric: Option<&'static str>,
1353        expected_type: ValueType,
1354        should_error: bool,
1355    }
1356
1357    #[test]
1358    fn test_retrieve_metric_name_and_result_type() {
1359        let test_cases = &[
1360            // Single metric cases
1361            TestCase {
1362                name: "simple metric",
1363                promql: "cpu_usage",
1364                expected_metric: Some("cpu_usage"),
1365                expected_type: ValueType::Vector,
1366                should_error: false,
1367            },
1368            TestCase {
1369                name: "metric with selector",
1370                promql: r#"cpu_usage{instance="localhost"}"#,
1371                expected_metric: Some("cpu_usage"),
1372                expected_type: ValueType::Vector,
1373                should_error: false,
1374            },
1375            TestCase {
1376                name: "metric with range selector",
1377                promql: "cpu_usage[5m]",
1378                expected_metric: Some("cpu_usage"),
1379                expected_type: ValueType::Matrix,
1380                should_error: false,
1381            },
1382            TestCase {
1383                name: "metric with __name__ matcher",
1384                promql: r#"{__name__="cpu_usage"}"#,
1385                expected_metric: Some("cpu_usage"),
1386                expected_type: ValueType::Vector,
1387                should_error: false,
1388            },
1389            TestCase {
1390                name: "metric with unary operator",
1391                promql: "-cpu_usage",
1392                expected_metric: Some("cpu_usage"),
1393                expected_type: ValueType::Vector,
1394                should_error: false,
1395            },
1396            // Aggregation and function cases
1397            TestCase {
1398                name: "metric with aggregation",
1399                promql: "sum(cpu_usage)",
1400                expected_metric: Some("cpu_usage"),
1401                expected_type: ValueType::Vector,
1402                should_error: false,
1403            },
1404            TestCase {
1405                name: "complex aggregation",
1406                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1407                expected_metric: Some("cpu_usage"),
1408                expected_type: ValueType::Vector,
1409                should_error: false,
1410            },
1411            // Same metric binary operations
1412            TestCase {
1413                name: "same metric addition",
1414                promql: "cpu_usage + cpu_usage",
1415                expected_metric: Some("cpu_usage"),
1416                expected_type: ValueType::Vector,
1417                should_error: false,
1418            },
1419            TestCase {
1420                name: "metric with scalar addition",
1421                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) by (instance) + 100"#,
1422                expected_metric: Some("cpu_usage"),
1423                expected_type: ValueType::Vector,
1424                should_error: false,
1425            },
1426            // Multiple metrics cases
1427            TestCase {
1428                name: "different metrics addition",
1429                promql: "cpu_usage + memory_usage",
1430                expected_metric: None,
1431                expected_type: ValueType::Vector,
1432                should_error: false,
1433            },
1434            TestCase {
1435                name: "different metrics subtraction",
1436                promql: "network_in - network_out",
1437                expected_metric: None,
1438                expected_type: ValueType::Vector,
1439                should_error: false,
1440            },
1441            // Unless operator cases
1442            TestCase {
1443                name: "unless with different metrics",
1444                promql: "cpu_usage unless memory_usage",
1445                expected_metric: None,
1446                expected_type: ValueType::Vector,
1447                should_error: false,
1448            },
1449            TestCase {
1450                name: "unless with same metric",
1451                promql: "cpu_usage unless cpu_usage",
1452                expected_metric: Some("cpu_usage"),
1453                expected_type: ValueType::Vector,
1454                should_error: false,
1455            },
1456            // Subquery cases
1457            TestCase {
1458                name: "basic subquery",
1459                promql: "cpu_usage[5m:1m]",
1460                expected_metric: Some("cpu_usage"),
1461                expected_type: ValueType::Matrix,
1462                should_error: false,
1463            },
1464            TestCase {
1465                name: "subquery with multiple metrics",
1466                promql: "(cpu_usage + memory_usage)[5m:1m]",
1467                expected_metric: None,
1468                expected_type: ValueType::Matrix,
1469                should_error: false,
1470            },
1471            // Literal values
1472            TestCase {
1473                name: "scalar value",
1474                promql: "42",
1475                expected_metric: None,
1476                expected_type: ValueType::Scalar,
1477                should_error: false,
1478            },
1479            TestCase {
1480                name: "string literal",
1481                promql: r#""hello world""#,
1482                expected_metric: None,
1483                expected_type: ValueType::String,
1484                should_error: false,
1485            },
1486            // Error cases
1487            TestCase {
1488                name: "invalid syntax",
1489                promql: "cpu_usage{invalid=",
1490                expected_metric: None,
1491                expected_type: ValueType::Vector,
1492                should_error: true,
1493            },
1494            TestCase {
1495                name: "empty query",
1496                promql: "",
1497                expected_metric: None,
1498                expected_type: ValueType::Vector,
1499                should_error: true,
1500            },
1501            TestCase {
1502                name: "malformed brackets",
1503                promql: "cpu_usage[5m",
1504                expected_metric: None,
1505                expected_type: ValueType::Vector,
1506                should_error: true,
1507            },
1508        ];
1509
1510        for test_case in test_cases {
1511            let result = retrieve_metric_name_and_result_type(test_case.promql);
1512
1513            if test_case.should_error {
1514                assert!(
1515                    result.is_err(),
1516                    "Test '{}' should have failed but succeeded with: {:?}",
1517                    test_case.name,
1518                    result
1519                );
1520            } else {
1521                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1522                    panic!(
1523                        "Test '{}' should have succeeded but failed with error: {}",
1524                        test_case.name, e
1525                    )
1526                });
1527
1528                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1529                assert_eq!(
1530                    metric_name, expected_metric_name,
1531                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1532                    test_case.name, expected_metric_name, metric_name
1533                );
1534
1535                assert_eq!(
1536                    value_type, test_case.expected_type,
1537                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1538                    test_case.name, test_case.expected_type, value_type
1539                );
1540            }
1541        }
1542    }
1543}