servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16
17use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54    SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use query::promql::planner::normalize_matcher;
58use serde::de::{self, MapAccess, Visitor};
59use serde::{Deserialize, Serialize};
60use serde_json::Value;
61use session::context::{QueryContext, QueryContextRef};
62use snafu::{Location, OptionExt, ResultExt};
63use store_api::metric_engine_consts::{
64    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
65};
66
67pub use super::result::prometheus_resp::PrometheusJsonResponse;
68use crate::error::{
69    CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
70    InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
71    UnexpectedResultSnafu,
72};
73use crate::http::header::collect_plan_metrics;
74use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
75use crate::prometheus_handler::PrometheusHandlerRef;
76
77/// For [ValueType::Vector] result type
78#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
79pub struct PromSeriesVector {
80    pub metric: BTreeMap<String, String>,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub value: Option<(f64, String)>,
83}
84
85/// For [ValueType::Matrix] result type
86#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
87pub struct PromSeriesMatrix {
88    pub metric: BTreeMap<String, String>,
89    pub values: Vec<(f64, String)>,
90}
91
92/// Variants corresponding to [ValueType]
93#[derive(Debug, Serialize, Deserialize, PartialEq)]
94#[serde(untagged)]
95pub enum PromQueryResult {
96    Matrix(Vec<PromSeriesMatrix>),
97    Vector(Vec<PromSeriesVector>),
98    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
100}
101
102impl Default for PromQueryResult {
103    fn default() -> Self {
104        PromQueryResult::Matrix(Default::default())
105    }
106}
107
108#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
109pub struct PromData {
110    #[serde(rename = "resultType")]
111    pub result_type: String,
112    pub result: PromQueryResult,
113}
114
115/// A "holder" for the reference([Arc]) to a column name,
116/// to help avoiding cloning [String]s when used as a [HashMap] key.
117#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
118pub struct Column(Arc<String>);
119
120impl From<&str> for Column {
121    fn from(s: &str) -> Self {
122        Self(Arc::new(s.to_string()))
123    }
124}
125
126#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
127#[serde(untagged)]
128pub enum PrometheusResponse {
129    PromData(PromData),
130    Labels(Vec<String>),
131    Series(Vec<HashMap<Column, String>>),
132    LabelValues(Vec<String>),
133    FormatQuery(String),
134    BuildInfo(OwnedBuildInfo),
135    #[serde(skip_deserializing)]
136    ParseResult(promql_parser::parser::Expr),
137    #[default]
138    None,
139}
140
141impl PrometheusResponse {
142    /// Append the other [`PrometheusResponse]`.
143    /// # NOTE
144    ///   Only append matrix and vector results, otherwise just ignore the other response.
145    fn append(&mut self, other: PrometheusResponse) {
146        match (self, other) {
147            (
148                PrometheusResponse::PromData(PromData {
149                    result: PromQueryResult::Matrix(lhs),
150                    ..
151                }),
152                PrometheusResponse::PromData(PromData {
153                    result: PromQueryResult::Matrix(rhs),
154                    ..
155                }),
156            ) => {
157                lhs.extend(rhs);
158            }
159
160            (
161                PrometheusResponse::PromData(PromData {
162                    result: PromQueryResult::Vector(lhs),
163                    ..
164                }),
165                PrometheusResponse::PromData(PromData {
166                    result: PromQueryResult::Vector(rhs),
167                    ..
168                }),
169            ) => {
170                lhs.extend(rhs);
171            }
172            _ => {
173                // TODO(dennis): process other cases?
174            }
175        }
176    }
177
178    pub fn is_none(&self) -> bool {
179        matches!(self, PrometheusResponse::None)
180    }
181}
182
183#[derive(Debug, Default, Serialize, Deserialize)]
184pub struct FormatQuery {
185    query: Option<String>,
186}
187
188#[axum_macros::debug_handler]
189#[tracing::instrument(
190    skip_all,
191    fields(protocol = "prometheus", request_type = "format_query")
192)]
193pub async fn format_query(
194    State(_handler): State<PrometheusHandlerRef>,
195    Query(params): Query<InstantQuery>,
196    Extension(_query_ctx): Extension<QueryContext>,
197    Form(form_params): Form<InstantQuery>,
198) -> PrometheusJsonResponse {
199    let query = params.query.or(form_params.query).unwrap_or_default();
200    match promql_parser::parser::parse(&query) {
201        Ok(expr) => {
202            let pretty = expr.prettify();
203            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
204        }
205        Err(reason) => {
206            let err = InvalidQuerySnafu { reason }.build();
207            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
208        }
209    }
210}
211
212#[derive(Debug, Default, Serialize, Deserialize)]
213pub struct BuildInfoQuery {}
214
215#[axum_macros::debug_handler]
216#[tracing::instrument(
217    skip_all,
218    fields(protocol = "prometheus", request_type = "build_info_query")
219)]
220pub async fn build_info_query() -> PrometheusJsonResponse {
221    let build_info = common_version::build_info().clone();
222    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
223}
224
225#[derive(Debug, Default, Serialize, Deserialize)]
226pub struct InstantQuery {
227    query: Option<String>,
228    lookback: Option<String>,
229    time: Option<String>,
230    timeout: Option<String>,
231    db: Option<String>,
232}
233
234/// Helper macro which try to evaluate the expression and return its results.
235/// If the evaluation fails, return a `PrometheusJsonResponse` early.
236macro_rules! try_call_return_response {
237    ($handle: expr) => {
238        match $handle {
239            Ok(res) => res,
240            Err(err) => {
241                let msg = err.to_string();
242                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
243            }
244        }
245    };
246}
247
248#[axum_macros::debug_handler]
249#[tracing::instrument(
250    skip_all,
251    fields(protocol = "prometheus", request_type = "instant_query")
252)]
253pub async fn instant_query(
254    State(handler): State<PrometheusHandlerRef>,
255    Query(params): Query<InstantQuery>,
256    Extension(mut query_ctx): Extension<QueryContext>,
257    Form(form_params): Form<InstantQuery>,
258) -> PrometheusJsonResponse {
259    // Extract time from query string, or use current server time if not specified.
260    let time = params
261        .time
262        .or(form_params.time)
263        .unwrap_or_else(current_time_rfc3339);
264    let prom_query = PromQuery {
265        query: params.query.or(form_params.query).unwrap_or_default(),
266        start: time.clone(),
267        end: time,
268        step: "1s".to_string(),
269        lookback: params
270            .lookback
271            .or(form_params.lookback)
272            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
273        alias: None,
274    };
275
276    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
277
278    // update catalog and schema in query context if necessary
279    if let Some(db) = &params.db {
280        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
281        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
282    }
283    let query_ctx = Arc::new(query_ctx);
284
285    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
286        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
287        .start_timer();
288
289    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
290        && !name_matchers.is_empty()
291    {
292        debug!("Find metric name matchers: {:?}", name_matchers);
293
294        let metric_names =
295            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
296
297        debug!("Find metric names: {:?}", metric_names);
298
299        if metric_names.is_empty() {
300            let result_type = promql_expr.value_type();
301
302            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
303                result_type: result_type.to_string(),
304                ..Default::default()
305            }));
306        }
307
308        let responses = join_all(metric_names.into_iter().map(|metric| {
309            let mut prom_query = prom_query.clone();
310            let mut promql_expr = promql_expr.clone();
311            let query_ctx = query_ctx.clone();
312            let handler = handler.clone();
313
314            async move {
315                update_metric_name_matcher(&mut promql_expr, &metric);
316                let new_query = promql_expr.to_string();
317                debug!(
318                    "Updated promql, before: {}, after: {}",
319                    &prom_query.query, new_query
320                );
321                prom_query.query = new_query;
322
323                do_instant_query(&handler, &prom_query, query_ctx).await
324            }
325        }))
326        .await;
327
328        responses
329            .into_iter()
330            .reduce(|mut acc, resp| {
331                acc.data.append(resp.data);
332                acc
333            })
334            .unwrap()
335    } else {
336        do_instant_query(&handler, &prom_query, query_ctx).await
337    }
338}
339
340/// Executes a single instant query and returns response
341async fn do_instant_query(
342    handler: &PrometheusHandlerRef,
343    prom_query: &PromQuery,
344    query_ctx: QueryContextRef,
345) -> PrometheusJsonResponse {
346    let result = handler.do_query(prom_query, query_ctx).await;
347    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
348        Ok((metric_name, result_type)) => (metric_name, result_type),
349        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
350    };
351    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
352}
353
354#[derive(Debug, Default, Serialize, Deserialize)]
355pub struct RangeQuery {
356    query: Option<String>,
357    start: Option<String>,
358    end: Option<String>,
359    step: Option<String>,
360    lookback: Option<String>,
361    timeout: Option<String>,
362    db: Option<String>,
363}
364
365#[axum_macros::debug_handler]
366#[tracing::instrument(
367    skip_all,
368    fields(protocol = "prometheus", request_type = "range_query")
369)]
370pub async fn range_query(
371    State(handler): State<PrometheusHandlerRef>,
372    Query(params): Query<RangeQuery>,
373    Extension(mut query_ctx): Extension<QueryContext>,
374    Form(form_params): Form<RangeQuery>,
375) -> PrometheusJsonResponse {
376    let prom_query = PromQuery {
377        query: params.query.or(form_params.query).unwrap_or_default(),
378        start: params.start.or(form_params.start).unwrap_or_default(),
379        end: params.end.or(form_params.end).unwrap_or_default(),
380        step: params.step.or(form_params.step).unwrap_or_default(),
381        lookback: params
382            .lookback
383            .or(form_params.lookback)
384            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
385        alias: None,
386    };
387
388    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
389
390    // update catalog and schema in query context if necessary
391    if let Some(db) = &params.db {
392        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
393        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
394    }
395    let query_ctx = Arc::new(query_ctx);
396    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
397        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
398        .start_timer();
399
400    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
401        && !name_matchers.is_empty()
402    {
403        debug!("Find metric name matchers: {:?}", name_matchers);
404
405        let metric_names =
406            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
407
408        debug!("Find metric names: {:?}", metric_names);
409
410        if metric_names.is_empty() {
411            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
412                result_type: ValueType::Matrix.to_string(),
413                ..Default::default()
414            }));
415        }
416
417        let responses = join_all(metric_names.into_iter().map(|metric| {
418            let mut prom_query = prom_query.clone();
419            let mut promql_expr = promql_expr.clone();
420            let query_ctx = query_ctx.clone();
421            let handler = handler.clone();
422
423            async move {
424                update_metric_name_matcher(&mut promql_expr, &metric);
425                let new_query = promql_expr.to_string();
426                debug!(
427                    "Updated promql, before: {}, after: {}",
428                    &prom_query.query, new_query
429                );
430                prom_query.query = new_query;
431
432                do_range_query(&handler, &prom_query, query_ctx).await
433            }
434        }))
435        .await;
436
437        // Safety: at least one responses, checked above
438        responses
439            .into_iter()
440            .reduce(|mut acc, resp| {
441                acc.data.append(resp.data);
442                acc
443            })
444            .unwrap()
445    } else {
446        do_range_query(&handler, &prom_query, query_ctx).await
447    }
448}
449
450/// Executes a single range query and returns response
451async fn do_range_query(
452    handler: &PrometheusHandlerRef,
453    prom_query: &PromQuery,
454    query_ctx: QueryContextRef,
455) -> PrometheusJsonResponse {
456    let result = handler.do_query(prom_query, query_ctx).await;
457    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
458        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
459        Ok((metric_name, _)) => metric_name,
460    };
461    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
462}
463
464#[derive(Debug, Default, Serialize)]
465struct Matches(Vec<String>);
466
467#[derive(Debug, Default, Serialize, Deserialize)]
468pub struct LabelsQuery {
469    start: Option<String>,
470    end: Option<String>,
471    lookback: Option<String>,
472    #[serde(flatten)]
473    matches: Matches,
474    db: Option<String>,
475}
476
477// Custom Deserialize method to support parsing repeated match[]
478impl<'de> Deserialize<'de> for Matches {
479    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
480    where
481        D: de::Deserializer<'de>,
482    {
483        struct MatchesVisitor;
484
485        impl<'d> Visitor<'d> for MatchesVisitor {
486            type Value = Vec<String>;
487
488            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
489                formatter.write_str("a string")
490            }
491
492            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
493            where
494                M: MapAccess<'d>,
495            {
496                let mut matches = Vec::new();
497                while let Some((key, value)) = access.next_entry::<String, String>()? {
498                    if key == "match[]" {
499                        matches.push(value);
500                    }
501                }
502                Ok(matches)
503            }
504        }
505        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
506    }
507}
508
509/// Handles schema errors, transforming a Result into an Option.
510/// - If the input is `Ok(v)`, returns `Some(v)`
511/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
512///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
513/// - If the input is `Err(err)` with any other error code, directly returns a
514///   `PrometheusJsonResponse::error`.
515macro_rules! handle_schema_err {
516    ($result:expr) => {
517        match $result {
518            Ok(v) => Some(v),
519            Err(err) => {
520                if err.status_code() == StatusCode::TableNotFound
521                    || err.status_code() == StatusCode::TableColumnNotFound
522                {
523                    // Prometheus won't report error if querying nonexist label and metric
524                    None
525                } else {
526                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
527                }
528            }
529        }
530    };
531}
532
533#[axum_macros::debug_handler]
534#[tracing::instrument(
535    skip_all,
536    fields(protocol = "prometheus", request_type = "labels_query")
537)]
538pub async fn labels_query(
539    State(handler): State<PrometheusHandlerRef>,
540    Query(params): Query<LabelsQuery>,
541    Extension(mut query_ctx): Extension<QueryContext>,
542    Form(form_params): Form<LabelsQuery>,
543) -> PrometheusJsonResponse {
544    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
545    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
546    let query_ctx = Arc::new(query_ctx);
547
548    let mut queries = params.matches.0;
549    if queries.is_empty() {
550        queries = form_params.matches.0;
551    }
552
553    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
554        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
555        .start_timer();
556
557    // Fetch all tag columns. It will be used as white-list for tag names.
558    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
559    {
560        Ok(labels) => labels,
561        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
562    };
563    // insert the special metric name label
564    let _ = labels.insert(METRIC_NAME.to_string());
565
566    // Fetch all columns if no query matcher is provided
567    if queries.is_empty() {
568        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
569        labels_vec.sort_unstable();
570        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
571    }
572
573    // Otherwise, run queries and extract column name from result set.
574    let start = params
575        .start
576        .or(form_params.start)
577        .unwrap_or_else(yesterday_rfc3339);
578    let end = params
579        .end
580        .or(form_params.end)
581        .unwrap_or_else(current_time_rfc3339);
582    let lookback = params
583        .lookback
584        .or(form_params.lookback)
585        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
586
587    let mut fetched_labels = HashSet::new();
588    let _ = fetched_labels.insert(METRIC_NAME.to_string());
589
590    let mut merge_map = HashMap::new();
591    for query in queries {
592        let prom_query = PromQuery {
593            query,
594            start: start.clone(),
595            end: end.clone(),
596            step: DEFAULT_LOOKBACK_STRING.to_string(),
597            lookback: lookback.clone(),
598            alias: None,
599        };
600
601        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
602        handle_schema_err!(
603            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
604                .await
605        );
606    }
607
608    // intersect `fetched_labels` with `labels` to filter out non-tag columns
609    fetched_labels.retain(|l| labels.contains(l));
610    let _ = labels.insert(METRIC_NAME.to_string());
611
612    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
613    sorted_labels.sort();
614    let merge_map = merge_map
615        .into_iter()
616        .map(|(k, v)| (k, Value::from(v)))
617        .collect();
618    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
619    resp.resp_metrics = merge_map;
620    resp
621}
622
623/// Get all tag column name of the given schema
624async fn get_all_column_names(
625    catalog: &str,
626    schema: &str,
627    manager: &CatalogManagerRef,
628) -> std::result::Result<HashSet<String>, catalog::error::Error> {
629    let table_names = manager.table_names(catalog, schema, None).await?;
630
631    let mut labels = HashSet::new();
632    for table_name in table_names {
633        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
634            continue;
635        };
636        for column in table.primary_key_columns() {
637            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
638                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
639            {
640                labels.insert(column.name);
641            }
642        }
643    }
644
645    Ok(labels)
646}
647
648async fn retrieve_series_from_query_result(
649    result: Result<Output>,
650    series: &mut Vec<HashMap<Column, String>>,
651    query_ctx: &QueryContext,
652    table_name: &str,
653    manager: &CatalogManagerRef,
654    metrics: &mut HashMap<String, u64>,
655) -> Result<()> {
656    let result = result?;
657
658    // fetch tag list
659    let table = manager
660        .table(
661            query_ctx.current_catalog(),
662            &query_ctx.current_schema(),
663            table_name,
664            Some(query_ctx),
665        )
666        .await
667        .context(CatalogSnafu)?
668        .with_context(|| TableNotFoundSnafu {
669            catalog: query_ctx.current_catalog(),
670            schema: query_ctx.current_schema(),
671            table: table_name,
672        })?;
673    let tag_columns = table
674        .primary_key_columns()
675        .map(|c| c.name)
676        .collect::<HashSet<_>>();
677
678    match result.data {
679        OutputData::RecordBatches(batches) => {
680            record_batches_to_series(batches, series, table_name, &tag_columns)
681        }
682        OutputData::Stream(stream) => {
683            let batches = RecordBatches::try_collect(stream)
684                .await
685                .context(CollectRecordbatchSnafu)?;
686            record_batches_to_series(batches, series, table_name, &tag_columns)
687        }
688        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
689            reason: "expected data result, but got affected rows".to_string(),
690            location: Location::default(),
691        }),
692    }?;
693
694    if let Some(ref plan) = result.meta.plan {
695        collect_plan_metrics(plan, &mut [metrics]);
696    }
697    Ok(())
698}
699
700/// Retrieve labels name from query result
701async fn retrieve_labels_name_from_query_result(
702    result: Result<Output>,
703    labels: &mut HashSet<String>,
704    metrics: &mut HashMap<String, u64>,
705) -> Result<()> {
706    let result = result?;
707    match result.data {
708        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
709        OutputData::Stream(stream) => {
710            let batches = RecordBatches::try_collect(stream)
711                .await
712                .context(CollectRecordbatchSnafu)?;
713            record_batches_to_labels_name(batches, labels)
714        }
715        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
716            reason: "expected data result, but got affected rows".to_string(),
717        }
718        .fail(),
719    }?;
720    if let Some(ref plan) = result.meta.plan {
721        collect_plan_metrics(plan, &mut [metrics]);
722    }
723    Ok(())
724}
725
726fn record_batches_to_series(
727    batches: RecordBatches,
728    series: &mut Vec<HashMap<Column, String>>,
729    table_name: &str,
730    tag_columns: &HashSet<String>,
731) -> Result<()> {
732    for batch in batches.iter() {
733        // project record batch to only contains tag columns
734        let projection = batch
735            .schema
736            .column_schemas()
737            .iter()
738            .enumerate()
739            .filter_map(|(idx, col)| {
740                if tag_columns.contains(&col.name) {
741                    Some(idx)
742                } else {
743                    None
744                }
745            })
746            .collect::<Vec<_>>();
747        let batch = batch
748            .try_project(&projection)
749            .context(CollectRecordbatchSnafu)?;
750
751        let mut writer = RowWriter::new(&batch.schema, table_name);
752        writer.write(batch, series)?;
753    }
754    Ok(())
755}
756
757/// Writer from a row in the record batch to a Prometheus time series:
758///
759/// `{__name__="<metric name>", <label name>="<label value>", ...}`
760///
761/// The metrics name is the table name; label names are the column names and
762/// the label values are the corresponding row values (all are converted to strings).
763struct RowWriter {
764    /// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
765    /// and label names, waiting to be filled by row values afterward.
766    template: HashMap<Column, Option<String>>,
767    /// The current filling row.
768    current: Option<HashMap<Column, Option<String>>>,
769}
770
771impl RowWriter {
772    fn new(schema: &SchemaRef, table: &str) -> Self {
773        let mut template = schema
774            .column_schemas()
775            .iter()
776            .map(|x| (x.name.as_str().into(), None))
777            .collect::<HashMap<Column, Option<String>>>();
778        template.insert("__name__".into(), Some(table.to_string()));
779        Self {
780            template,
781            current: None,
782        }
783    }
784
785    fn insert(&mut self, column: ColumnRef, value: impl ToString) {
786        let current = self.current.get_or_insert_with(|| self.template.clone());
787        match current.get_mut(&column as &dyn AsColumnRef) {
788            Some(x) => {
789                let _ = x.insert(value.to_string());
790            }
791            None => {
792                let _ = current.insert(column.0.into(), Some(value.to_string()));
793            }
794        }
795    }
796
797    fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
798        let column_name = column_schema.name.as_str().into();
799
800        if column_schema.data_type.is_json() {
801            let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
802            self.insert(column_name, s);
803        } else {
804            let hex = bytes
805                .iter()
806                .map(|b| format!("{b:02x}"))
807                .collect::<Vec<String>>()
808                .join("");
809            self.insert(column_name, hex);
810        }
811        Ok(())
812    }
813
814    fn finish(&mut self) -> HashMap<Column, String> {
815        let Some(current) = self.current.take() else {
816            return HashMap::new();
817        };
818        current
819            .into_iter()
820            .filter_map(|(k, v)| v.map(|v| (k, v)))
821            .collect()
822    }
823
824    fn write(
825        &mut self,
826        record_batch: RecordBatch,
827        series: &mut Vec<HashMap<Column, String>>,
828    ) -> Result<()> {
829        let schema = record_batch.schema.clone();
830        let record_batch = record_batch.into_df_record_batch();
831        for i in 0..record_batch.num_rows() {
832            for (j, array) in record_batch.columns().iter().enumerate() {
833                let column = schema.column_name_by_index(j).into();
834
835                if array.is_null(i) {
836                    self.insert(column, "Null");
837                    continue;
838                }
839
840                match array.data_type() {
841                    DataType::Null => {
842                        self.insert(column, "Null");
843                    }
844                    DataType::Boolean => {
845                        let array = array.as_boolean();
846                        let v = array.value(i);
847                        self.insert(column, v);
848                    }
849                    DataType::UInt8 => {
850                        let array = array.as_primitive::<UInt8Type>();
851                        let v = array.value(i);
852                        self.insert(column, v);
853                    }
854                    DataType::UInt16 => {
855                        let array = array.as_primitive::<UInt16Type>();
856                        let v = array.value(i);
857                        self.insert(column, v);
858                    }
859                    DataType::UInt32 => {
860                        let array = array.as_primitive::<UInt32Type>();
861                        let v = array.value(i);
862                        self.insert(column, v);
863                    }
864                    DataType::UInt64 => {
865                        let array = array.as_primitive::<UInt64Type>();
866                        let v = array.value(i);
867                        self.insert(column, v);
868                    }
869                    DataType::Int8 => {
870                        let array = array.as_primitive::<Int8Type>();
871                        let v = array.value(i);
872                        self.insert(column, v);
873                    }
874                    DataType::Int16 => {
875                        let array = array.as_primitive::<Int16Type>();
876                        let v = array.value(i);
877                        self.insert(column, v);
878                    }
879                    DataType::Int32 => {
880                        let array = array.as_primitive::<Int32Type>();
881                        let v = array.value(i);
882                        self.insert(column, v);
883                    }
884                    DataType::Int64 => {
885                        let array = array.as_primitive::<Int64Type>();
886                        let v = array.value(i);
887                        self.insert(column, v);
888                    }
889                    DataType::Float32 => {
890                        let array = array.as_primitive::<Float32Type>();
891                        let v = array.value(i);
892                        self.insert(column, v);
893                    }
894                    DataType::Float64 => {
895                        let array = array.as_primitive::<Float64Type>();
896                        let v = array.value(i);
897                        self.insert(column, v);
898                    }
899                    DataType::Utf8 => {
900                        let array = array.as_string::<i32>();
901                        let v = array.value(i);
902                        self.insert(column, v);
903                    }
904                    DataType::LargeUtf8 => {
905                        let array = array.as_string::<i64>();
906                        let v = array.value(i);
907                        self.insert(column, v);
908                    }
909                    DataType::Utf8View => {
910                        let array = array.as_string_view();
911                        let v = array.value(i);
912                        self.insert(column, v);
913                    }
914                    DataType::Binary => {
915                        let array = array.as_binary::<i32>();
916                        let v = array.value(i);
917                        let column_schema = &schema.column_schemas()[j];
918                        self.insert_bytes(column_schema, v)?;
919                    }
920                    DataType::LargeBinary => {
921                        let array = array.as_binary::<i64>();
922                        let v = array.value(i);
923                        let column_schema = &schema.column_schemas()[j];
924                        self.insert_bytes(column_schema, v)?;
925                    }
926                    DataType::BinaryView => {
927                        let array = array.as_binary_view();
928                        let v = array.value(i);
929                        let column_schema = &schema.column_schemas()[j];
930                        self.insert_bytes(column_schema, v)?;
931                    }
932                    DataType::Date32 => {
933                        let array = array.as_primitive::<Date32Type>();
934                        let v = Date::new(array.value(i));
935                        self.insert(column, v);
936                    }
937                    DataType::Date64 => {
938                        let array = array.as_primitive::<Date64Type>();
939                        // `Date64` values are milliseconds representation of `Date32` values,
940                        // according to its specification. So we convert the `Date64` value here to
941                        // the `Date32` value to process them unified.
942                        let v = Date::new((array.value(i) / 86_400_000) as i32);
943                        self.insert(column, v);
944                    }
945                    DataType::Timestamp(_, _) => {
946                        let v = datatypes::arrow_array::timestamp_array_value(array, i);
947                        self.insert(column, v.to_iso8601_string());
948                    }
949                    DataType::Time32(_) | DataType::Time64(_) => {
950                        let v = datatypes::arrow_array::time_array_value(array, i);
951                        self.insert(column, v.to_iso8601_string());
952                    }
953                    DataType::Interval(interval_unit) => match interval_unit {
954                        IntervalUnit::YearMonth => {
955                            let array = array.as_primitive::<IntervalYearMonthType>();
956                            let v: IntervalYearMonth = array.value(i).into();
957                            self.insert(column, v.to_iso8601_string());
958                        }
959                        IntervalUnit::DayTime => {
960                            let array = array.as_primitive::<IntervalDayTimeType>();
961                            let v: IntervalDayTime = array.value(i).into();
962                            self.insert(column, v.to_iso8601_string());
963                        }
964                        IntervalUnit::MonthDayNano => {
965                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
966                            let v: IntervalMonthDayNano = array.value(i).into();
967                            self.insert(column, v.to_iso8601_string());
968                        }
969                    },
970                    DataType::Duration(_) => {
971                        let d = datatypes::arrow_array::duration_array_value(array, i);
972                        self.insert(column, d);
973                    }
974                    DataType::List(_) => {
975                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
976                        self.insert(column, v);
977                    }
978                    DataType::Struct(_) => {
979                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
980                        self.insert(column, v);
981                    }
982                    DataType::Decimal128(precision, scale) => {
983                        let array = array.as_primitive::<Decimal128Type>();
984                        let v = Decimal128::new(array.value(i), *precision, *scale);
985                        self.insert(column, v);
986                    }
987                    _ => {
988                        return NotSupportedSnafu {
989                            feat: format!("convert {} to http value", array.data_type()),
990                        }
991                        .fail();
992                    }
993                }
994            }
995
996            series.push(self.finish())
997        }
998        Ok(())
999    }
1000}
1001
1002#[derive(Debug, Clone, Copy, PartialEq)]
1003struct ColumnRef<'a>(&'a str);
1004
1005impl<'a> From<&'a str> for ColumnRef<'a> {
1006    fn from(s: &'a str) -> Self {
1007        Self(s)
1008    }
1009}
1010
1011trait AsColumnRef {
1012    fn as_ref(&self) -> ColumnRef<'_>;
1013}
1014
1015impl AsColumnRef for Column {
1016    fn as_ref(&self) -> ColumnRef<'_> {
1017        self.0.as_str().into()
1018    }
1019}
1020
1021impl AsColumnRef for ColumnRef<'_> {
1022    fn as_ref(&self) -> ColumnRef<'_> {
1023        *self
1024    }
1025}
1026
1027impl<'a> PartialEq for dyn AsColumnRef + 'a {
1028    fn eq(&self, other: &Self) -> bool {
1029        self.as_ref() == other.as_ref()
1030    }
1031}
1032
1033impl<'a> Eq for dyn AsColumnRef + 'a {}
1034
1035impl<'a> Hash for dyn AsColumnRef + 'a {
1036    fn hash<H: Hasher>(&self, state: &mut H) {
1037        self.as_ref().0.hash(state);
1038    }
1039}
1040
1041impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1042    fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1043        self
1044    }
1045}
1046
1047/// Retrieve labels name from record batches
1048fn record_batches_to_labels_name(
1049    batches: RecordBatches,
1050    labels: &mut HashSet<String>,
1051) -> Result<()> {
1052    let mut column_indices = Vec::new();
1053    let mut field_column_indices = Vec::new();
1054    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1055        if let ConcreteDataType::Float64(_) = column.data_type {
1056            field_column_indices.push(i);
1057        }
1058        column_indices.push(i);
1059    }
1060
1061    if field_column_indices.is_empty() {
1062        return Err(Error::Internal {
1063            err_msg: "no value column found".to_string(),
1064        });
1065    }
1066
1067    for batch in batches.iter() {
1068        let names = column_indices
1069            .iter()
1070            .map(|c| batches.schema().column_name_by_index(*c).to_string())
1071            .collect::<Vec<_>>();
1072
1073        let field_columns = field_column_indices
1074            .iter()
1075            .map(|i| {
1076                let column = batch.column(*i);
1077                column.as_primitive::<Float64Type>()
1078            })
1079            .collect::<Vec<_>>();
1080
1081        for row_index in 0..batch.num_rows() {
1082            // if all field columns are null, skip this row
1083            if field_columns.iter().all(|c| c.is_null(row_index)) {
1084                continue;
1085            }
1086
1087            // if a field is not null, record the tag name and return
1088            names.iter().for_each(|name| {
1089                let _ = labels.insert(name.clone());
1090            });
1091            return Ok(());
1092        }
1093    }
1094    Ok(())
1095}
1096
1097pub(crate) fn retrieve_metric_name_and_result_type(
1098    promql: &str,
1099) -> Result<(Option<String>, ValueType)> {
1100    let promql_expr = promql_parser::parser::parse(promql)
1101        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1102    let metric_name = promql_expr_to_metric_name(&promql_expr);
1103    let result_type = promql_expr.value_type();
1104
1105    Ok((metric_name, result_type))
1106}
1107
1108/// Tries to get catalog and schema from an optional db param. And retrieves
1109/// them from [QueryContext] if they don't present.
1110pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1111    if let Some(db) = db {
1112        parse_catalog_and_schema_from_db_string(db)
1113    } else {
1114        (
1115            ctx.current_catalog().to_string(),
1116            ctx.current_schema().clone(),
1117        )
1118    }
1119}
1120
1121/// Update catalog and schema in [QueryContext] if necessary.
1122pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1123    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1124        ctx.set_current_catalog(catalog);
1125        ctx.set_current_schema(schema);
1126    }
1127}
1128
1129fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1130    let mut metric_names = HashSet::new();
1131    collect_metric_names(expr, &mut metric_names);
1132
1133    // Return the metric name only if there's exactly one unique metric name
1134    if metric_names.len() == 1 {
1135        metric_names.into_iter().next()
1136    } else {
1137        None
1138    }
1139}
1140
1141/// Recursively collect all metric names from a PromQL expression
1142fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1143    match expr {
1144        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1145            match modifier {
1146                Some(LabelModifier::Include(labels)) => {
1147                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
1148                        metric_names.clear();
1149                        return;
1150                    }
1151                }
1152                Some(LabelModifier::Exclude(labels)) => {
1153                    if labels.labels.contains(&METRIC_NAME.to_string()) {
1154                        metric_names.clear();
1155                        return;
1156                    }
1157                }
1158                _ => {}
1159            }
1160            collect_metric_names(expr, metric_names)
1161        }
1162        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1163        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1164            if matches!(
1165                op.id(),
1166                token::T_LAND // INTERSECT
1167                    | token::T_LOR // UNION
1168                    | token::T_LUNLESS // EXCEPT
1169            ) {
1170                collect_metric_names(lhs, metric_names)
1171            } else {
1172                metric_names.clear()
1173            }
1174        }
1175        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1176        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1177        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1178            if let Some(name) = name {
1179                metric_names.insert(name.clone());
1180            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1181                metric_names.insert(matcher.value);
1182            }
1183        }
1184        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1185            let VectorSelector { name, matchers, .. } = vs;
1186            if let Some(name) = name {
1187                metric_names.insert(name.clone());
1188            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1189                metric_names.insert(matcher.value);
1190            }
1191        }
1192        PromqlExpr::Call(Call { args, .. }) => {
1193            args.args
1194                .iter()
1195                .for_each(|e| collect_metric_names(e, metric_names));
1196        }
1197        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1198    }
1199}
1200
1201fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1202where
1203    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1204{
1205    match expr {
1206        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1207        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1208        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1209            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1210        }
1211        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1212        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1213        PromqlExpr::NumberLiteral(_) => None,
1214        PromqlExpr::StringLiteral(_) => None,
1215        PromqlExpr::Extension(_) => None,
1216        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1217        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1218            let VectorSelector { name, matchers, .. } = vs;
1219
1220            f(name, matchers)
1221        }
1222        PromqlExpr::Call(Call { args, .. }) => args
1223            .args
1224            .iter()
1225            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1226    }
1227}
1228
1229/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
1230fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1231    find_metric_name_and_matchers(expr, |name, matchers| {
1232        // Has name, ignore the matchers
1233        if name.is_some() {
1234            return None;
1235        }
1236
1237        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
1238        Some(matchers.find_matchers(METRIC_NAME))
1239    })
1240    .map(|matchers| {
1241        matchers
1242            .into_iter()
1243            .filter(|m| !matches!(m.op, MatchOp::Equal))
1244            .map(normalize_matcher)
1245            .collect::<Vec<_>>()
1246    })
1247}
1248
1249/// Update the `__name__` matchers in expression into special value
1250/// Returns the updated expression.
1251fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1252    match expr {
1253        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1254            update_metric_name_matcher(expr, metric_name)
1255        }
1256        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1257        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1258            update_metric_name_matcher(lhs, metric_name);
1259            update_metric_name_matcher(rhs, metric_name);
1260        }
1261        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1262        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1263            update_metric_name_matcher(expr, metric_name)
1264        }
1265        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1266            if name.is_some() {
1267                return;
1268            }
1269
1270            for m in &mut matchers.matchers {
1271                if m.name == METRIC_NAME {
1272                    m.op = MatchOp::Equal;
1273                    m.value = metric_name.to_string();
1274                }
1275            }
1276        }
1277        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1278            let VectorSelector { name, matchers, .. } = vs;
1279            if name.is_some() {
1280                return;
1281            }
1282
1283            for m in &mut matchers.matchers {
1284                if m.name == METRIC_NAME {
1285                    m.op = MatchOp::Equal;
1286                    m.value = metric_name.to_string();
1287                }
1288            }
1289        }
1290        PromqlExpr::Call(Call { args, .. }) => {
1291            args.args.iter_mut().for_each(|e| {
1292                update_metric_name_matcher(e, metric_name);
1293            });
1294        }
1295        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1296    }
1297}
1298
1299#[derive(Debug, Default, Serialize, Deserialize)]
1300pub struct LabelValueQuery {
1301    start: Option<String>,
1302    end: Option<String>,
1303    lookback: Option<String>,
1304    #[serde(flatten)]
1305    matches: Matches,
1306    db: Option<String>,
1307    limit: Option<usize>,
1308}
1309
1310#[axum_macros::debug_handler]
1311#[tracing::instrument(
1312    skip_all,
1313    fields(protocol = "prometheus", request_type = "label_values_query")
1314)]
1315pub async fn label_values_query(
1316    State(handler): State<PrometheusHandlerRef>,
1317    Path(label_name): Path<String>,
1318    Extension(mut query_ctx): Extension<QueryContext>,
1319    Query(params): Query<LabelValueQuery>,
1320) -> PrometheusJsonResponse {
1321    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1322    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1323    let query_ctx = Arc::new(query_ctx);
1324
1325    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1326        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1327        .start_timer();
1328
1329    if label_name == METRIC_NAME_LABEL {
1330        let catalog_manager = handler.catalog_manager();
1331
1332        let mut table_names = try_call_return_response!(
1333            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1334        );
1335
1336        truncate_results(&mut table_names, params.limit);
1337        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1338    } else if label_name == FIELD_NAME_LABEL {
1339        let field_columns = handle_schema_err!(
1340            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1341        )
1342        .unwrap_or_default();
1343        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1344        field_columns.sort_unstable();
1345        truncate_results(&mut field_columns, params.limit);
1346        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1347    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1348        let catalog_manager = handler.catalog_manager();
1349
1350        let mut schema_names = try_call_return_response!(
1351            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1352        );
1353        truncate_results(&mut schema_names, params.limit);
1354        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1355    }
1356
1357    let queries = params.matches.0;
1358    if queries.is_empty() {
1359        return PrometheusJsonResponse::error(
1360            StatusCode::InvalidArguments,
1361            "match[] parameter is required",
1362        );
1363    }
1364
1365    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1366    let end = params.end.unwrap_or_else(current_time_rfc3339);
1367    let mut label_values = HashSet::new();
1368
1369    let start = try_call_return_response!(
1370        QueryLanguageParser::parse_promql_timestamp(&start)
1371            .context(ParseTimestampSnafu { timestamp: &start })
1372    );
1373    let end = try_call_return_response!(
1374        QueryLanguageParser::parse_promql_timestamp(&end)
1375            .context(ParseTimestampSnafu { timestamp: &end })
1376    );
1377
1378    for query in queries {
1379        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1380        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1381            return PrometheusJsonResponse::error(
1382                StatusCode::InvalidArguments,
1383                "expected vector selector",
1384            );
1385        };
1386        let Some(name) = take_metric_name(&mut vector_selector) else {
1387            return PrometheusJsonResponse::error(
1388                StatusCode::InvalidArguments,
1389                "expected metric name",
1390            );
1391        };
1392        let VectorSelector { matchers, .. } = vector_selector;
1393        // Only use and filter matchers.
1394        let matchers = matchers.matchers;
1395        let result = handler
1396            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1397            .await;
1398        if let Some(result) = handle_schema_err!(result) {
1399            label_values.extend(result.into_iter());
1400        }
1401    }
1402
1403    let mut label_values: Vec<_> = label_values.into_iter().collect();
1404    label_values.sort_unstable();
1405    truncate_results(&mut label_values, params.limit);
1406
1407    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1408}
1409
1410fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1411    if let Some(limit) = limit
1412        && limit > 0
1413        && label_values.len() >= limit
1414    {
1415        label_values.truncate(limit);
1416    }
1417}
1418
1419/// Take metric name from the [VectorSelector].
1420/// It takes the name in the selector or removes the name matcher.
1421fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1422    if let Some(name) = selector.name.take() {
1423        return Some(name);
1424    }
1425
1426    let (pos, matcher) = selector
1427        .matchers
1428        .matchers
1429        .iter()
1430        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1431    let name = matcher.value.clone();
1432    // We need to remove the name matcher to avoid using it as a filter in query.
1433    selector.matchers.matchers.remove(pos);
1434
1435    Some(name)
1436}
1437
1438async fn retrieve_table_names(
1439    query_ctx: &QueryContext,
1440    catalog_manager: CatalogManagerRef,
1441    matches: Vec<String>,
1442) -> Result<Vec<String>> {
1443    let catalog = query_ctx.current_catalog();
1444    let schema = query_ctx.current_schema();
1445
1446    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1447    let mut table_names = Vec::new();
1448
1449    // we only provide very limited support for matcher against __name__
1450    let name_matcher = matches
1451        .first()
1452        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1453        .and_then(|expr| {
1454            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1455                let matchers = vector_selector.matchers.matchers;
1456                for matcher in matchers {
1457                    if matcher.name == METRIC_NAME_LABEL {
1458                        return Some(matcher);
1459                    }
1460                }
1461
1462                None
1463            } else {
1464                None
1465            }
1466        });
1467
1468    while let Some(table) = tables_stream.next().await {
1469        let table = table.context(CatalogSnafu)?;
1470        if !table
1471            .table_info()
1472            .meta
1473            .options
1474            .extra_options
1475            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1476        {
1477            // skip non-prometheus (non-metricengine) tables for __name__ query
1478            continue;
1479        }
1480
1481        let table_name = &table.table_info().name;
1482
1483        if let Some(matcher) = &name_matcher {
1484            match &matcher.op {
1485                MatchOp::Equal => {
1486                    if table_name == &matcher.value {
1487                        table_names.push(table_name.clone());
1488                    }
1489                }
1490                MatchOp::Re(reg) => {
1491                    if reg.is_match(table_name) {
1492                        table_names.push(table_name.clone());
1493                    }
1494                }
1495                _ => {
1496                    // != and !~ are not supported:
1497                    // vector must contains at least one non-empty matcher
1498                    table_names.push(table_name.clone());
1499                }
1500            }
1501        } else {
1502            table_names.push(table_name.clone());
1503        }
1504    }
1505
1506    table_names.sort_unstable();
1507    Ok(table_names)
1508}
1509
1510async fn retrieve_field_names(
1511    query_ctx: &QueryContext,
1512    manager: CatalogManagerRef,
1513    matches: Vec<String>,
1514) -> Result<HashSet<String>> {
1515    let mut field_columns = HashSet::new();
1516    let catalog = query_ctx.current_catalog();
1517    let schema = query_ctx.current_schema();
1518
1519    if matches.is_empty() {
1520        // query all tables if no matcher is provided
1521        while let Some(table) = manager
1522            .tables(catalog, &schema, Some(query_ctx))
1523            .next()
1524            .await
1525        {
1526            let table = table.context(CatalogSnafu)?;
1527            for column in table.field_columns() {
1528                field_columns.insert(column.name);
1529            }
1530        }
1531        return Ok(field_columns);
1532    }
1533
1534    for table_name in matches {
1535        let table = manager
1536            .table(catalog, &schema, &table_name, Some(query_ctx))
1537            .await
1538            .context(CatalogSnafu)?
1539            .with_context(|| TableNotFoundSnafu {
1540                catalog: catalog.to_string(),
1541                schema: schema.clone(),
1542                table: table_name.clone(),
1543            })?;
1544
1545        for column in table.field_columns() {
1546            field_columns.insert(column.name);
1547        }
1548    }
1549    Ok(field_columns)
1550}
1551
1552async fn retrieve_schema_names(
1553    query_ctx: &QueryContext,
1554    catalog_manager: CatalogManagerRef,
1555    matches: Vec<String>,
1556) -> Result<Vec<String>> {
1557    let mut schemas = Vec::new();
1558    let catalog = query_ctx.current_catalog();
1559
1560    let candidate_schemas = catalog_manager
1561        .schema_names(catalog, Some(query_ctx))
1562        .await
1563        .context(CatalogSnafu)?;
1564
1565    for schema in candidate_schemas {
1566        let mut found = true;
1567        for match_item in &matches {
1568            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1569                let exists = catalog_manager
1570                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1571                    .await
1572                    .context(CatalogSnafu)?;
1573                if !exists {
1574                    found = false;
1575                    break;
1576                }
1577            }
1578        }
1579
1580        if found {
1581            schemas.push(schema);
1582        }
1583    }
1584
1585    schemas.sort_unstable();
1586
1587    Ok(schemas)
1588}
1589
1590/// Try to parse and extract the name of referenced metric from the promql query.
1591///
1592/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1593/// Multiple references to the same metric are allowed.
1594fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1595    let promql_expr = promql_parser::parser::parse(query).ok()?;
1596    promql_expr_to_metric_name(&promql_expr)
1597}
1598
1599#[derive(Debug, Default, Serialize, Deserialize)]
1600pub struct SeriesQuery {
1601    start: Option<String>,
1602    end: Option<String>,
1603    lookback: Option<String>,
1604    #[serde(flatten)]
1605    matches: Matches,
1606    db: Option<String>,
1607}
1608
1609#[axum_macros::debug_handler]
1610#[tracing::instrument(
1611    skip_all,
1612    fields(protocol = "prometheus", request_type = "series_query")
1613)]
1614pub async fn series_query(
1615    State(handler): State<PrometheusHandlerRef>,
1616    Query(params): Query<SeriesQuery>,
1617    Extension(mut query_ctx): Extension<QueryContext>,
1618    Form(form_params): Form<SeriesQuery>,
1619) -> PrometheusJsonResponse {
1620    let mut queries: Vec<String> = params.matches.0;
1621    if queries.is_empty() {
1622        queries = form_params.matches.0;
1623    }
1624    if queries.is_empty() {
1625        return PrometheusJsonResponse::error(
1626            StatusCode::Unsupported,
1627            "match[] parameter is required",
1628        );
1629    }
1630    let start = params
1631        .start
1632        .or(form_params.start)
1633        .unwrap_or_else(yesterday_rfc3339);
1634    let end = params
1635        .end
1636        .or(form_params.end)
1637        .unwrap_or_else(current_time_rfc3339);
1638    let lookback = params
1639        .lookback
1640        .or(form_params.lookback)
1641        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1642
1643    // update catalog and schema in query context if necessary
1644    if let Some(db) = &params.db {
1645        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1646        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1647    }
1648    let query_ctx = Arc::new(query_ctx);
1649
1650    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1651        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1652        .start_timer();
1653
1654    let mut series = Vec::new();
1655    let mut merge_map = HashMap::new();
1656    for query in queries {
1657        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1658        let prom_query = PromQuery {
1659            query,
1660            start: start.clone(),
1661            end: end.clone(),
1662            // TODO: find a better value for step
1663            step: DEFAULT_LOOKBACK_STRING.to_string(),
1664            lookback: lookback.clone(),
1665            alias: None,
1666        };
1667        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1668
1669        handle_schema_err!(
1670            retrieve_series_from_query_result(
1671                result,
1672                &mut series,
1673                &query_ctx,
1674                &table_name,
1675                &handler.catalog_manager(),
1676                &mut merge_map,
1677            )
1678            .await
1679        );
1680    }
1681    let merge_map = merge_map
1682        .into_iter()
1683        .map(|(k, v)| (k, Value::from(v)))
1684        .collect();
1685    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1686    resp.resp_metrics = merge_map;
1687    resp
1688}
1689
1690#[derive(Debug, Default, Serialize, Deserialize)]
1691pub struct ParseQuery {
1692    query: Option<String>,
1693    db: Option<String>,
1694}
1695
1696#[axum_macros::debug_handler]
1697#[tracing::instrument(
1698    skip_all,
1699    fields(protocol = "prometheus", request_type = "parse_query")
1700)]
1701pub async fn parse_query(
1702    State(_handler): State<PrometheusHandlerRef>,
1703    Query(params): Query<ParseQuery>,
1704    Extension(_query_ctx): Extension<QueryContext>,
1705    Form(form_params): Form<ParseQuery>,
1706) -> PrometheusJsonResponse {
1707    if let Some(query) = params.query.or(form_params.query) {
1708        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1709        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1710    } else {
1711        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1712    }
1713}
1714
1715#[cfg(test)]
1716mod tests {
1717    use promql_parser::parser::value::ValueType;
1718
1719    use super::*;
1720
1721    struct TestCase {
1722        name: &'static str,
1723        promql: &'static str,
1724        expected_metric: Option<&'static str>,
1725        expected_type: ValueType,
1726        should_error: bool,
1727    }
1728
1729    #[test]
1730    fn test_retrieve_metric_name_and_result_type() {
1731        let test_cases = &[
1732            // Single metric cases
1733            TestCase {
1734                name: "simple metric",
1735                promql: "cpu_usage",
1736                expected_metric: Some("cpu_usage"),
1737                expected_type: ValueType::Vector,
1738                should_error: false,
1739            },
1740            TestCase {
1741                name: "metric with selector",
1742                promql: r#"cpu_usage{instance="localhost"}"#,
1743                expected_metric: Some("cpu_usage"),
1744                expected_type: ValueType::Vector,
1745                should_error: false,
1746            },
1747            TestCase {
1748                name: "metric with range selector",
1749                promql: "cpu_usage[5m]",
1750                expected_metric: Some("cpu_usage"),
1751                expected_type: ValueType::Matrix,
1752                should_error: false,
1753            },
1754            TestCase {
1755                name: "metric with __name__ matcher",
1756                promql: r#"{__name__="cpu_usage"}"#,
1757                expected_metric: Some("cpu_usage"),
1758                expected_type: ValueType::Vector,
1759                should_error: false,
1760            },
1761            TestCase {
1762                name: "metric with unary operator",
1763                promql: "-cpu_usage",
1764                expected_metric: None,
1765                expected_type: ValueType::Vector,
1766                should_error: false,
1767            },
1768            // Aggregation and function cases
1769            TestCase {
1770                name: "metric with aggregation",
1771                promql: "sum(cpu_usage)",
1772                expected_metric: Some("cpu_usage"),
1773                expected_type: ValueType::Vector,
1774                should_error: false,
1775            },
1776            TestCase {
1777                name: "complex aggregation",
1778                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1779                expected_metric: None,
1780                expected_type: ValueType::Vector,
1781                should_error: false,
1782            },
1783            TestCase {
1784                name: "complex aggregation",
1785                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1786                expected_metric: Some("cpu_usage"),
1787                expected_type: ValueType::Vector,
1788                should_error: false,
1789            },
1790            TestCase {
1791                name: "complex aggregation",
1792                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1793                expected_metric: Some("cpu_usage"),
1794                expected_type: ValueType::Vector,
1795                should_error: false,
1796            },
1797            // Same metric binary operations
1798            TestCase {
1799                name: "same metric addition",
1800                promql: "cpu_usage + cpu_usage",
1801                expected_metric: None,
1802                expected_type: ValueType::Vector,
1803                should_error: false,
1804            },
1805            TestCase {
1806                name: "metric with scalar addition",
1807                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1808                expected_metric: None,
1809                expected_type: ValueType::Vector,
1810                should_error: false,
1811            },
1812            // Multiple metrics cases
1813            TestCase {
1814                name: "different metrics addition",
1815                promql: "cpu_usage + memory_usage",
1816                expected_metric: None,
1817                expected_type: ValueType::Vector,
1818                should_error: false,
1819            },
1820            TestCase {
1821                name: "different metrics subtraction",
1822                promql: "network_in - network_out",
1823                expected_metric: None,
1824                expected_type: ValueType::Vector,
1825                should_error: false,
1826            },
1827            // Unless operator cases
1828            TestCase {
1829                name: "unless with different metrics",
1830                promql: "cpu_usage unless memory_usage",
1831                expected_metric: Some("cpu_usage"),
1832                expected_type: ValueType::Vector,
1833                should_error: false,
1834            },
1835            TestCase {
1836                name: "unless with same metric",
1837                promql: "cpu_usage unless cpu_usage",
1838                expected_metric: Some("cpu_usage"),
1839                expected_type: ValueType::Vector,
1840                should_error: false,
1841            },
1842            // Subquery cases
1843            TestCase {
1844                name: "basic subquery",
1845                promql: "cpu_usage[5m:1m]",
1846                expected_metric: Some("cpu_usage"),
1847                expected_type: ValueType::Matrix,
1848                should_error: false,
1849            },
1850            TestCase {
1851                name: "subquery with multiple metrics",
1852                promql: "(cpu_usage + memory_usage)[5m:1m]",
1853                expected_metric: None,
1854                expected_type: ValueType::Matrix,
1855                should_error: false,
1856            },
1857            // Literal values
1858            TestCase {
1859                name: "scalar value",
1860                promql: "42",
1861                expected_metric: None,
1862                expected_type: ValueType::Scalar,
1863                should_error: false,
1864            },
1865            TestCase {
1866                name: "string literal",
1867                promql: r#""hello world""#,
1868                expected_metric: None,
1869                expected_type: ValueType::String,
1870                should_error: false,
1871            },
1872            // Error cases
1873            TestCase {
1874                name: "invalid syntax",
1875                promql: "cpu_usage{invalid=",
1876                expected_metric: None,
1877                expected_type: ValueType::Vector,
1878                should_error: true,
1879            },
1880            TestCase {
1881                name: "empty query",
1882                promql: "",
1883                expected_metric: None,
1884                expected_type: ValueType::Vector,
1885                should_error: true,
1886            },
1887            TestCase {
1888                name: "malformed brackets",
1889                promql: "cpu_usage[5m",
1890                expected_metric: None,
1891                expected_type: ValueType::Vector,
1892                should_error: true,
1893            },
1894        ];
1895
1896        for test_case in test_cases {
1897            let result = retrieve_metric_name_and_result_type(test_case.promql);
1898
1899            if test_case.should_error {
1900                assert!(
1901                    result.is_err(),
1902                    "Test '{}' should have failed but succeeded with: {:?}",
1903                    test_case.name,
1904                    result
1905                );
1906            } else {
1907                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1908                    panic!(
1909                        "Test '{}' should have succeeded but failed with error: {}",
1910                        test_case.name, e
1911                    )
1912                });
1913
1914                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1915                assert_eq!(
1916                    metric_name, expected_metric_name,
1917                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1918                    test_case.name, expected_metric_name, metric_name
1919                );
1920
1921                assert_eq!(
1922                    value_type, test_case.expected_type,
1923                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1924                    test_case.name, test_case.expected_type, value_type
1925                );
1926            }
1927        }
1928    }
1929}