servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16
17use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54    SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68    CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
69    InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
70    UnexpectedResultSnafu,
71};
72use crate::http::header::collect_plan_metrics;
73use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
74use crate::prometheus_handler::PrometheusHandlerRef;
75
76/// For [ValueType::Vector] result type
77#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
78pub struct PromSeriesVector {
79    pub metric: BTreeMap<String, String>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub value: Option<(f64, String)>,
82}
83
84/// For [ValueType::Matrix] result type
85#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
86pub struct PromSeriesMatrix {
87    pub metric: BTreeMap<String, String>,
88    pub values: Vec<(f64, String)>,
89}
90
91/// Variants corresponding to [ValueType]
92#[derive(Debug, Serialize, Deserialize, PartialEq)]
93#[serde(untagged)]
94pub enum PromQueryResult {
95    Matrix(Vec<PromSeriesMatrix>),
96    Vector(Vec<PromSeriesVector>),
97    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99}
100
101impl Default for PromQueryResult {
102    fn default() -> Self {
103        PromQueryResult::Matrix(Default::default())
104    }
105}
106
107#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
108pub struct PromData {
109    #[serde(rename = "resultType")]
110    pub result_type: String,
111    pub result: PromQueryResult,
112}
113
114/// A "holder" for the reference([Arc]) to a column name,
115/// to help avoiding cloning [String]s when used as a [HashMap] key.
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
117pub struct Column(Arc<String>);
118
119impl From<&str> for Column {
120    fn from(s: &str) -> Self {
121        Self(Arc::new(s.to_string()))
122    }
123}
124
125#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
126#[serde(untagged)]
127pub enum PrometheusResponse {
128    PromData(PromData),
129    Labels(Vec<String>),
130    Series(Vec<HashMap<Column, String>>),
131    LabelValues(Vec<String>),
132    FormatQuery(String),
133    BuildInfo(OwnedBuildInfo),
134    #[serde(skip_deserializing)]
135    ParseResult(promql_parser::parser::Expr),
136    #[default]
137    None,
138}
139
140impl PrometheusResponse {
141    /// Append the other [`PrometheusResponse]`.
142    /// # NOTE
143    ///   Only append matrix and vector results, otherwise just ignore the other response.
144    fn append(&mut self, other: PrometheusResponse) {
145        match (self, other) {
146            (
147                PrometheusResponse::PromData(PromData {
148                    result: PromQueryResult::Matrix(lhs),
149                    ..
150                }),
151                PrometheusResponse::PromData(PromData {
152                    result: PromQueryResult::Matrix(rhs),
153                    ..
154                }),
155            ) => {
156                lhs.extend(rhs);
157            }
158
159            (
160                PrometheusResponse::PromData(PromData {
161                    result: PromQueryResult::Vector(lhs),
162                    ..
163                }),
164                PrometheusResponse::PromData(PromData {
165                    result: PromQueryResult::Vector(rhs),
166                    ..
167                }),
168            ) => {
169                lhs.extend(rhs);
170            }
171            _ => {
172                // TODO(dennis): process other cases?
173            }
174        }
175    }
176
177    pub fn is_none(&self) -> bool {
178        matches!(self, PrometheusResponse::None)
179    }
180}
181
182#[derive(Debug, Default, Serialize, Deserialize)]
183pub struct FormatQuery {
184    query: Option<String>,
185}
186
187#[axum_macros::debug_handler]
188#[tracing::instrument(
189    skip_all,
190    fields(protocol = "prometheus", request_type = "format_query")
191)]
192pub async fn format_query(
193    State(_handler): State<PrometheusHandlerRef>,
194    Query(params): Query<InstantQuery>,
195    Extension(_query_ctx): Extension<QueryContext>,
196    Form(form_params): Form<InstantQuery>,
197) -> PrometheusJsonResponse {
198    let query = params.query.or(form_params.query).unwrap_or_default();
199    match promql_parser::parser::parse(&query) {
200        Ok(expr) => {
201            let pretty = expr.prettify();
202            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
203        }
204        Err(reason) => {
205            let err = InvalidQuerySnafu { reason }.build();
206            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
207        }
208    }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct BuildInfoQuery {}
213
214#[axum_macros::debug_handler]
215#[tracing::instrument(
216    skip_all,
217    fields(protocol = "prometheus", request_type = "build_info_query")
218)]
219pub async fn build_info_query() -> PrometheusJsonResponse {
220    let build_info = common_version::build_info().clone();
221    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
222}
223
224#[derive(Debug, Default, Serialize, Deserialize)]
225pub struct InstantQuery {
226    query: Option<String>,
227    lookback: Option<String>,
228    time: Option<String>,
229    timeout: Option<String>,
230    db: Option<String>,
231}
232
233/// Helper macro which try to evaluate the expression and return its results.
234/// If the evaluation fails, return a `PrometheusJsonResponse` early.
235macro_rules! try_call_return_response {
236    ($handle: expr) => {
237        match $handle {
238            Ok(res) => res,
239            Err(err) => {
240                let msg = err.to_string();
241                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
242            }
243        }
244    };
245}
246
247#[axum_macros::debug_handler]
248#[tracing::instrument(
249    skip_all,
250    fields(protocol = "prometheus", request_type = "instant_query")
251)]
252pub async fn instant_query(
253    State(handler): State<PrometheusHandlerRef>,
254    Query(params): Query<InstantQuery>,
255    Extension(mut query_ctx): Extension<QueryContext>,
256    Form(form_params): Form<InstantQuery>,
257) -> PrometheusJsonResponse {
258    // Extract time from query string, or use current server time if not specified.
259    let time = params
260        .time
261        .or(form_params.time)
262        .unwrap_or_else(current_time_rfc3339);
263    let prom_query = PromQuery {
264        query: params.query.or(form_params.query).unwrap_or_default(),
265        start: time.clone(),
266        end: time,
267        step: "1s".to_string(),
268        lookback: params
269            .lookback
270            .or(form_params.lookback)
271            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
272        alias: None,
273    };
274
275    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
276
277    // update catalog and schema in query context if necessary
278    if let Some(db) = &params.db {
279        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
280        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
281    }
282    let query_ctx = Arc::new(query_ctx);
283
284    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
285        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
286        .start_timer();
287
288    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
289        && !name_matchers.is_empty()
290    {
291        debug!("Find metric name matchers: {:?}", name_matchers);
292
293        let metric_names =
294            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
295
296        debug!("Find metric names: {:?}", metric_names);
297
298        if metric_names.is_empty() {
299            let result_type = promql_expr.value_type();
300
301            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
302                result_type: result_type.to_string(),
303                ..Default::default()
304            }));
305        }
306
307        let responses = join_all(metric_names.into_iter().map(|metric| {
308            let mut prom_query = prom_query.clone();
309            let mut promql_expr = promql_expr.clone();
310            let query_ctx = query_ctx.clone();
311            let handler = handler.clone();
312
313            async move {
314                update_metric_name_matcher(&mut promql_expr, &metric);
315                let new_query = promql_expr.to_string();
316                debug!(
317                    "Updated promql, before: {}, after: {}",
318                    &prom_query.query, new_query
319                );
320                prom_query.query = new_query;
321
322                do_instant_query(&handler, &prom_query, query_ctx).await
323            }
324        }))
325        .await;
326
327        responses
328            .into_iter()
329            .reduce(|mut acc, resp| {
330                acc.data.append(resp.data);
331                acc
332            })
333            .unwrap()
334    } else {
335        do_instant_query(&handler, &prom_query, query_ctx).await
336    }
337}
338
339/// Executes a single instant query and returns response
340async fn do_instant_query(
341    handler: &PrometheusHandlerRef,
342    prom_query: &PromQuery,
343    query_ctx: QueryContextRef,
344) -> PrometheusJsonResponse {
345    let result = handler.do_query(prom_query, query_ctx).await;
346    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
347        Ok((metric_name, result_type)) => (metric_name, result_type),
348        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
349    };
350    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
351}
352
353#[derive(Debug, Default, Serialize, Deserialize)]
354pub struct RangeQuery {
355    query: Option<String>,
356    start: Option<String>,
357    end: Option<String>,
358    step: Option<String>,
359    lookback: Option<String>,
360    timeout: Option<String>,
361    db: Option<String>,
362}
363
364#[axum_macros::debug_handler]
365#[tracing::instrument(
366    skip_all,
367    fields(protocol = "prometheus", request_type = "range_query")
368)]
369pub async fn range_query(
370    State(handler): State<PrometheusHandlerRef>,
371    Query(params): Query<RangeQuery>,
372    Extension(mut query_ctx): Extension<QueryContext>,
373    Form(form_params): Form<RangeQuery>,
374) -> PrometheusJsonResponse {
375    let prom_query = PromQuery {
376        query: params.query.or(form_params.query).unwrap_or_default(),
377        start: params.start.or(form_params.start).unwrap_or_default(),
378        end: params.end.or(form_params.end).unwrap_or_default(),
379        step: params.step.or(form_params.step).unwrap_or_default(),
380        lookback: params
381            .lookback
382            .or(form_params.lookback)
383            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
384        alias: None,
385    };
386
387    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
388
389    // update catalog and schema in query context if necessary
390    if let Some(db) = &params.db {
391        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
392        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
393    }
394    let query_ctx = Arc::new(query_ctx);
395    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
396        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
397        .start_timer();
398
399    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
400        && !name_matchers.is_empty()
401    {
402        debug!("Find metric name matchers: {:?}", name_matchers);
403
404        let metric_names =
405            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
406
407        debug!("Find metric names: {:?}", metric_names);
408
409        if metric_names.is_empty() {
410            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
411                result_type: ValueType::Matrix.to_string(),
412                ..Default::default()
413            }));
414        }
415
416        let responses = join_all(metric_names.into_iter().map(|metric| {
417            let mut prom_query = prom_query.clone();
418            let mut promql_expr = promql_expr.clone();
419            let query_ctx = query_ctx.clone();
420            let handler = handler.clone();
421
422            async move {
423                update_metric_name_matcher(&mut promql_expr, &metric);
424                let new_query = promql_expr.to_string();
425                debug!(
426                    "Updated promql, before: {}, after: {}",
427                    &prom_query.query, new_query
428                );
429                prom_query.query = new_query;
430
431                do_range_query(&handler, &prom_query, query_ctx).await
432            }
433        }))
434        .await;
435
436        // Safety: at least one responses, checked above
437        responses
438            .into_iter()
439            .reduce(|mut acc, resp| {
440                acc.data.append(resp.data);
441                acc
442            })
443            .unwrap()
444    } else {
445        do_range_query(&handler, &prom_query, query_ctx).await
446    }
447}
448
449/// Executes a single range query and returns response
450async fn do_range_query(
451    handler: &PrometheusHandlerRef,
452    prom_query: &PromQuery,
453    query_ctx: QueryContextRef,
454) -> PrometheusJsonResponse {
455    let result = handler.do_query(prom_query, query_ctx).await;
456    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
457        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
458        Ok((metric_name, _)) => metric_name,
459    };
460    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
461}
462
463#[derive(Debug, Default, Serialize)]
464struct Matches(Vec<String>);
465
466#[derive(Debug, Default, Serialize, Deserialize)]
467pub struct LabelsQuery {
468    start: Option<String>,
469    end: Option<String>,
470    lookback: Option<String>,
471    #[serde(flatten)]
472    matches: Matches,
473    db: Option<String>,
474}
475
476// Custom Deserialize method to support parsing repeated match[]
477impl<'de> Deserialize<'de> for Matches {
478    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
479    where
480        D: de::Deserializer<'de>,
481    {
482        struct MatchesVisitor;
483
484        impl<'d> Visitor<'d> for MatchesVisitor {
485            type Value = Vec<String>;
486
487            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
488                formatter.write_str("a string")
489            }
490
491            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
492            where
493                M: MapAccess<'d>,
494            {
495                let mut matches = Vec::new();
496                while let Some((key, value)) = access.next_entry::<String, String>()? {
497                    if key == "match[]" {
498                        matches.push(value);
499                    }
500                }
501                Ok(matches)
502            }
503        }
504        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
505    }
506}
507
508/// Handles schema errors, transforming a Result into an Option.
509/// - If the input is `Ok(v)`, returns `Some(v)`
510/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
511///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
512/// - If the input is `Err(err)` with any other error code, directly returns a
513///   `PrometheusJsonResponse::error`.
514macro_rules! handle_schema_err {
515    ($result:expr) => {
516        match $result {
517            Ok(v) => Some(v),
518            Err(err) => {
519                if err.status_code() == StatusCode::TableNotFound
520                    || err.status_code() == StatusCode::TableColumnNotFound
521                {
522                    // Prometheus won't report error if querying nonexist label and metric
523                    None
524                } else {
525                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
526                }
527            }
528        }
529    };
530}
531
532#[axum_macros::debug_handler]
533#[tracing::instrument(
534    skip_all,
535    fields(protocol = "prometheus", request_type = "labels_query")
536)]
537pub async fn labels_query(
538    State(handler): State<PrometheusHandlerRef>,
539    Query(params): Query<LabelsQuery>,
540    Extension(mut query_ctx): Extension<QueryContext>,
541    Form(form_params): Form<LabelsQuery>,
542) -> PrometheusJsonResponse {
543    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
544    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
545    let query_ctx = Arc::new(query_ctx);
546
547    let mut queries = params.matches.0;
548    if queries.is_empty() {
549        queries = form_params.matches.0;
550    }
551
552    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
553        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
554        .start_timer();
555
556    // Fetch all tag columns. It will be used as white-list for tag names.
557    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
558    {
559        Ok(labels) => labels,
560        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
561    };
562    // insert the special metric name label
563    let _ = labels.insert(METRIC_NAME.to_string());
564
565    // Fetch all columns if no query matcher is provided
566    if queries.is_empty() {
567        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
568        labels_vec.sort_unstable();
569        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
570    }
571
572    // Otherwise, run queries and extract column name from result set.
573    let start = params
574        .start
575        .or(form_params.start)
576        .unwrap_or_else(yesterday_rfc3339);
577    let end = params
578        .end
579        .or(form_params.end)
580        .unwrap_or_else(current_time_rfc3339);
581    let lookback = params
582        .lookback
583        .or(form_params.lookback)
584        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
585
586    let mut fetched_labels = HashSet::new();
587    let _ = fetched_labels.insert(METRIC_NAME.to_string());
588
589    let mut merge_map = HashMap::new();
590    for query in queries {
591        let prom_query = PromQuery {
592            query,
593            start: start.clone(),
594            end: end.clone(),
595            step: DEFAULT_LOOKBACK_STRING.to_string(),
596            lookback: lookback.clone(),
597            alias: None,
598        };
599
600        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
601        handle_schema_err!(
602            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
603                .await
604        );
605    }
606
607    // intersect `fetched_labels` with `labels` to filter out non-tag columns
608    fetched_labels.retain(|l| labels.contains(l));
609    let _ = labels.insert(METRIC_NAME.to_string());
610
611    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
612    sorted_labels.sort();
613    let merge_map = merge_map
614        .into_iter()
615        .map(|(k, v)| (k, Value::from(v)))
616        .collect();
617    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
618    resp.resp_metrics = merge_map;
619    resp
620}
621
622/// Get all tag column name of the given schema
623async fn get_all_column_names(
624    catalog: &str,
625    schema: &str,
626    manager: &CatalogManagerRef,
627) -> std::result::Result<HashSet<String>, catalog::error::Error> {
628    let table_names = manager.table_names(catalog, schema, None).await?;
629
630    let mut labels = HashSet::new();
631    for table_name in table_names {
632        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
633            continue;
634        };
635        for column in table.primary_key_columns() {
636            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
637                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
638            {
639                labels.insert(column.name);
640            }
641        }
642    }
643
644    Ok(labels)
645}
646
647async fn retrieve_series_from_query_result(
648    result: Result<Output>,
649    series: &mut Vec<HashMap<Column, String>>,
650    query_ctx: &QueryContext,
651    table_name: &str,
652    manager: &CatalogManagerRef,
653    metrics: &mut HashMap<String, u64>,
654) -> Result<()> {
655    let result = result?;
656
657    // fetch tag list
658    let table = manager
659        .table(
660            query_ctx.current_catalog(),
661            &query_ctx.current_schema(),
662            table_name,
663            Some(query_ctx),
664        )
665        .await
666        .context(CatalogSnafu)?
667        .with_context(|| TableNotFoundSnafu {
668            catalog: query_ctx.current_catalog(),
669            schema: query_ctx.current_schema(),
670            table: table_name,
671        })?;
672    let tag_columns = table
673        .primary_key_columns()
674        .map(|c| c.name)
675        .collect::<HashSet<_>>();
676
677    match result.data {
678        OutputData::RecordBatches(batches) => {
679            record_batches_to_series(batches, series, table_name, &tag_columns)
680        }
681        OutputData::Stream(stream) => {
682            let batches = RecordBatches::try_collect(stream)
683                .await
684                .context(CollectRecordbatchSnafu)?;
685            record_batches_to_series(batches, series, table_name, &tag_columns)
686        }
687        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
688            reason: "expected data result, but got affected rows".to_string(),
689            location: Location::default(),
690        }),
691    }?;
692
693    if let Some(ref plan) = result.meta.plan {
694        collect_plan_metrics(plan, &mut [metrics]);
695    }
696    Ok(())
697}
698
699/// Retrieve labels name from query result
700async fn retrieve_labels_name_from_query_result(
701    result: Result<Output>,
702    labels: &mut HashSet<String>,
703    metrics: &mut HashMap<String, u64>,
704) -> Result<()> {
705    let result = result?;
706    match result.data {
707        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
708        OutputData::Stream(stream) => {
709            let batches = RecordBatches::try_collect(stream)
710                .await
711                .context(CollectRecordbatchSnafu)?;
712            record_batches_to_labels_name(batches, labels)
713        }
714        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
715            reason: "expected data result, but got affected rows".to_string(),
716        }
717        .fail(),
718    }?;
719    if let Some(ref plan) = result.meta.plan {
720        collect_plan_metrics(plan, &mut [metrics]);
721    }
722    Ok(())
723}
724
725fn record_batches_to_series(
726    batches: RecordBatches,
727    series: &mut Vec<HashMap<Column, String>>,
728    table_name: &str,
729    tag_columns: &HashSet<String>,
730) -> Result<()> {
731    for batch in batches.iter() {
732        // project record batch to only contains tag columns
733        let projection = batch
734            .schema
735            .column_schemas()
736            .iter()
737            .enumerate()
738            .filter_map(|(idx, col)| {
739                if tag_columns.contains(&col.name) {
740                    Some(idx)
741                } else {
742                    None
743                }
744            })
745            .collect::<Vec<_>>();
746        let batch = batch
747            .try_project(&projection)
748            .context(CollectRecordbatchSnafu)?;
749
750        let mut writer = RowWriter::new(&batch.schema, table_name);
751        writer.write(batch, series)?;
752    }
753    Ok(())
754}
755
756/// Writer from a row in the record batch to a Prometheus time series:
757///
758/// `{__name__="<metric name>", <label name>="<label value>", ...}`
759///
760/// The metrics name is the table name; label names are the column names and
761/// the label values are the corresponding row values (all are converted to strings).
762struct RowWriter {
763    /// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
764    /// and label names, waiting to be filled by row values afterward.
765    template: HashMap<Column, Option<String>>,
766    /// The current filling row.
767    current: Option<HashMap<Column, Option<String>>>,
768}
769
770impl RowWriter {
771    fn new(schema: &SchemaRef, table: &str) -> Self {
772        let mut template = schema
773            .column_schemas()
774            .iter()
775            .map(|x| (x.name.as_str().into(), None))
776            .collect::<HashMap<Column, Option<String>>>();
777        template.insert("__name__".into(), Some(table.to_string()));
778        Self {
779            template,
780            current: None,
781        }
782    }
783
784    fn insert(&mut self, column: ColumnRef, value: impl ToString) {
785        let current = self.current.get_or_insert_with(|| self.template.clone());
786        match current.get_mut(&column as &dyn AsColumnRef) {
787            Some(x) => {
788                let _ = x.insert(value.to_string());
789            }
790            None => {
791                let _ = current.insert(column.0.into(), Some(value.to_string()));
792            }
793        }
794    }
795
796    fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
797        let column_name = column_schema.name.as_str().into();
798
799        if column_schema.data_type.is_json() {
800            let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
801            self.insert(column_name, s);
802        } else {
803            let hex = bytes
804                .iter()
805                .map(|b| format!("{b:02x}"))
806                .collect::<Vec<String>>()
807                .join("");
808            self.insert(column_name, hex);
809        }
810        Ok(())
811    }
812
813    fn finish(&mut self) -> HashMap<Column, String> {
814        let Some(current) = self.current.take() else {
815            return HashMap::new();
816        };
817        current
818            .into_iter()
819            .filter_map(|(k, v)| v.map(|v| (k, v)))
820            .collect()
821    }
822
823    fn write(
824        &mut self,
825        record_batch: RecordBatch,
826        series: &mut Vec<HashMap<Column, String>>,
827    ) -> Result<()> {
828        let schema = record_batch.schema.clone();
829        let record_batch = record_batch.into_df_record_batch();
830        for i in 0..record_batch.num_rows() {
831            for (j, array) in record_batch.columns().iter().enumerate() {
832                let column = schema.column_name_by_index(j).into();
833
834                if array.is_null(i) {
835                    self.insert(column, "Null");
836                    continue;
837                }
838
839                match array.data_type() {
840                    DataType::Null => {
841                        self.insert(column, "Null");
842                    }
843                    DataType::Boolean => {
844                        let array = array.as_boolean();
845                        let v = array.value(i);
846                        self.insert(column, v);
847                    }
848                    DataType::UInt8 => {
849                        let array = array.as_primitive::<UInt8Type>();
850                        let v = array.value(i);
851                        self.insert(column, v);
852                    }
853                    DataType::UInt16 => {
854                        let array = array.as_primitive::<UInt16Type>();
855                        let v = array.value(i);
856                        self.insert(column, v);
857                    }
858                    DataType::UInt32 => {
859                        let array = array.as_primitive::<UInt32Type>();
860                        let v = array.value(i);
861                        self.insert(column, v);
862                    }
863                    DataType::UInt64 => {
864                        let array = array.as_primitive::<UInt64Type>();
865                        let v = array.value(i);
866                        self.insert(column, v);
867                    }
868                    DataType::Int8 => {
869                        let array = array.as_primitive::<Int8Type>();
870                        let v = array.value(i);
871                        self.insert(column, v);
872                    }
873                    DataType::Int16 => {
874                        let array = array.as_primitive::<Int16Type>();
875                        let v = array.value(i);
876                        self.insert(column, v);
877                    }
878                    DataType::Int32 => {
879                        let array = array.as_primitive::<Int32Type>();
880                        let v = array.value(i);
881                        self.insert(column, v);
882                    }
883                    DataType::Int64 => {
884                        let array = array.as_primitive::<Int64Type>();
885                        let v = array.value(i);
886                        self.insert(column, v);
887                    }
888                    DataType::Float32 => {
889                        let array = array.as_primitive::<Float32Type>();
890                        let v = array.value(i);
891                        self.insert(column, v);
892                    }
893                    DataType::Float64 => {
894                        let array = array.as_primitive::<Float64Type>();
895                        let v = array.value(i);
896                        self.insert(column, v);
897                    }
898                    DataType::Utf8 => {
899                        let array = array.as_string::<i32>();
900                        let v = array.value(i);
901                        self.insert(column, v);
902                    }
903                    DataType::LargeUtf8 => {
904                        let array = array.as_string::<i64>();
905                        let v = array.value(i);
906                        self.insert(column, v);
907                    }
908                    DataType::Utf8View => {
909                        let array = array.as_string_view();
910                        let v = array.value(i);
911                        self.insert(column, v);
912                    }
913                    DataType::Binary => {
914                        let array = array.as_binary::<i32>();
915                        let v = array.value(i);
916                        let column_schema = &schema.column_schemas()[j];
917                        self.insert_bytes(column_schema, v)?;
918                    }
919                    DataType::LargeBinary => {
920                        let array = array.as_binary::<i64>();
921                        let v = array.value(i);
922                        let column_schema = &schema.column_schemas()[j];
923                        self.insert_bytes(column_schema, v)?;
924                    }
925                    DataType::BinaryView => {
926                        let array = array.as_binary_view();
927                        let v = array.value(i);
928                        let column_schema = &schema.column_schemas()[j];
929                        self.insert_bytes(column_schema, v)?;
930                    }
931                    DataType::Date32 => {
932                        let array = array.as_primitive::<Date32Type>();
933                        let v = Date::new(array.value(i));
934                        self.insert(column, v);
935                    }
936                    DataType::Date64 => {
937                        let array = array.as_primitive::<Date64Type>();
938                        // `Date64` values are milliseconds representation of `Date32` values,
939                        // according to its specification. So we convert the `Date64` value here to
940                        // the `Date32` value to process them unified.
941                        let v = Date::new((array.value(i) / 86_400_000) as i32);
942                        self.insert(column, v);
943                    }
944                    DataType::Timestamp(_, _) => {
945                        let v = datatypes::arrow_array::timestamp_array_value(array, i);
946                        self.insert(column, v.to_iso8601_string());
947                    }
948                    DataType::Time32(_) | DataType::Time64(_) => {
949                        let v = datatypes::arrow_array::time_array_value(array, i);
950                        self.insert(column, v.to_iso8601_string());
951                    }
952                    DataType::Interval(interval_unit) => match interval_unit {
953                        IntervalUnit::YearMonth => {
954                            let array = array.as_primitive::<IntervalYearMonthType>();
955                            let v: IntervalYearMonth = array.value(i).into();
956                            self.insert(column, v.to_iso8601_string());
957                        }
958                        IntervalUnit::DayTime => {
959                            let array = array.as_primitive::<IntervalDayTimeType>();
960                            let v: IntervalDayTime = array.value(i).into();
961                            self.insert(column, v.to_iso8601_string());
962                        }
963                        IntervalUnit::MonthDayNano => {
964                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
965                            let v: IntervalMonthDayNano = array.value(i).into();
966                            self.insert(column, v.to_iso8601_string());
967                        }
968                    },
969                    DataType::Duration(_) => {
970                        let d = datatypes::arrow_array::duration_array_value(array, i);
971                        self.insert(column, d);
972                    }
973                    DataType::List(_) => {
974                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
975                        self.insert(column, v);
976                    }
977                    DataType::Struct(_) => {
978                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
979                        self.insert(column, v);
980                    }
981                    DataType::Decimal128(precision, scale) => {
982                        let array = array.as_primitive::<Decimal128Type>();
983                        let v = Decimal128::new(array.value(i), *precision, *scale);
984                        self.insert(column, v);
985                    }
986                    _ => {
987                        return NotSupportedSnafu {
988                            feat: format!("convert {} to http value", array.data_type()),
989                        }
990                        .fail();
991                    }
992                }
993            }
994
995            series.push(self.finish())
996        }
997        Ok(())
998    }
999}
1000
1001#[derive(Debug, Clone, Copy, PartialEq)]
1002struct ColumnRef<'a>(&'a str);
1003
1004impl<'a> From<&'a str> for ColumnRef<'a> {
1005    fn from(s: &'a str) -> Self {
1006        Self(s)
1007    }
1008}
1009
1010trait AsColumnRef {
1011    fn as_ref(&self) -> ColumnRef<'_>;
1012}
1013
1014impl AsColumnRef for Column {
1015    fn as_ref(&self) -> ColumnRef<'_> {
1016        self.0.as_str().into()
1017    }
1018}
1019
1020impl AsColumnRef for ColumnRef<'_> {
1021    fn as_ref(&self) -> ColumnRef<'_> {
1022        *self
1023    }
1024}
1025
1026impl<'a> PartialEq for dyn AsColumnRef + 'a {
1027    fn eq(&self, other: &Self) -> bool {
1028        self.as_ref() == other.as_ref()
1029    }
1030}
1031
1032impl<'a> Eq for dyn AsColumnRef + 'a {}
1033
1034impl<'a> Hash for dyn AsColumnRef + 'a {
1035    fn hash<H: Hasher>(&self, state: &mut H) {
1036        self.as_ref().0.hash(state);
1037    }
1038}
1039
1040impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1041    fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1042        self
1043    }
1044}
1045
1046/// Retrieve labels name from record batches
1047fn record_batches_to_labels_name(
1048    batches: RecordBatches,
1049    labels: &mut HashSet<String>,
1050) -> Result<()> {
1051    let mut column_indices = Vec::new();
1052    let mut field_column_indices = Vec::new();
1053    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1054        if let ConcreteDataType::Float64(_) = column.data_type {
1055            field_column_indices.push(i);
1056        }
1057        column_indices.push(i);
1058    }
1059
1060    if field_column_indices.is_empty() {
1061        return Err(Error::Internal {
1062            err_msg: "no value column found".to_string(),
1063        });
1064    }
1065
1066    for batch in batches.iter() {
1067        let names = column_indices
1068            .iter()
1069            .map(|c| batches.schema().column_name_by_index(*c).to_string())
1070            .collect::<Vec<_>>();
1071
1072        let field_columns = field_column_indices
1073            .iter()
1074            .map(|i| {
1075                let column = batch.column(*i);
1076                column.as_primitive::<Float64Type>()
1077            })
1078            .collect::<Vec<_>>();
1079
1080        for row_index in 0..batch.num_rows() {
1081            // if all field columns are null, skip this row
1082            if field_columns.iter().all(|c| c.is_null(row_index)) {
1083                continue;
1084            }
1085
1086            // if a field is not null, record the tag name and return
1087            names.iter().for_each(|name| {
1088                let _ = labels.insert(name.clone());
1089            });
1090            return Ok(());
1091        }
1092    }
1093    Ok(())
1094}
1095
1096pub(crate) fn retrieve_metric_name_and_result_type(
1097    promql: &str,
1098) -> Result<(Option<String>, ValueType)> {
1099    let promql_expr = promql_parser::parser::parse(promql)
1100        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1101    let metric_name = promql_expr_to_metric_name(&promql_expr);
1102    let result_type = promql_expr.value_type();
1103
1104    Ok((metric_name, result_type))
1105}
1106
1107/// Tries to get catalog and schema from an optional db param. And retrieves
1108/// them from [QueryContext] if they don't present.
1109pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1110    if let Some(db) = db {
1111        parse_catalog_and_schema_from_db_string(db)
1112    } else {
1113        (
1114            ctx.current_catalog().to_string(),
1115            ctx.current_schema().clone(),
1116        )
1117    }
1118}
1119
1120/// Update catalog and schema in [QueryContext] if necessary.
1121pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1122    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1123        ctx.set_current_catalog(catalog);
1124        ctx.set_current_schema(schema);
1125    }
1126}
1127
1128fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1129    let mut metric_names = HashSet::new();
1130    collect_metric_names(expr, &mut metric_names);
1131
1132    // Return the metric name only if there's exactly one unique metric name
1133    if metric_names.len() == 1 {
1134        metric_names.into_iter().next()
1135    } else {
1136        None
1137    }
1138}
1139
1140/// Recursively collect all metric names from a PromQL expression
1141fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1142    match expr {
1143        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1144            match modifier {
1145                Some(LabelModifier::Include(labels)) => {
1146                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
1147                        metric_names.clear();
1148                        return;
1149                    }
1150                }
1151                Some(LabelModifier::Exclude(labels)) => {
1152                    if labels.labels.contains(&METRIC_NAME.to_string()) {
1153                        metric_names.clear();
1154                        return;
1155                    }
1156                }
1157                _ => {}
1158            }
1159            collect_metric_names(expr, metric_names)
1160        }
1161        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1162        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1163            if matches!(
1164                op.id(),
1165                token::T_LAND // INTERSECT
1166                    | token::T_LOR // UNION
1167                    | token::T_LUNLESS // EXCEPT
1168            ) {
1169                collect_metric_names(lhs, metric_names)
1170            } else {
1171                metric_names.clear()
1172            }
1173        }
1174        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1175        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1176        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1177            if let Some(name) = name {
1178                metric_names.insert(name.clone());
1179            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1180                metric_names.insert(matcher.value);
1181            }
1182        }
1183        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1184            let VectorSelector { name, matchers, .. } = vs;
1185            if let Some(name) = name {
1186                metric_names.insert(name.clone());
1187            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1188                metric_names.insert(matcher.value);
1189            }
1190        }
1191        PromqlExpr::Call(Call { args, .. }) => {
1192            args.args
1193                .iter()
1194                .for_each(|e| collect_metric_names(e, metric_names));
1195        }
1196        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1197    }
1198}
1199
1200fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1201where
1202    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1203{
1204    match expr {
1205        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1206        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1207        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1208            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1209        }
1210        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1211        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1212        PromqlExpr::NumberLiteral(_) => None,
1213        PromqlExpr::StringLiteral(_) => None,
1214        PromqlExpr::Extension(_) => None,
1215        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1216        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1217            let VectorSelector { name, matchers, .. } = vs;
1218
1219            f(name, matchers)
1220        }
1221        PromqlExpr::Call(Call { args, .. }) => args
1222            .args
1223            .iter()
1224            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1225    }
1226}
1227
1228/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
1229fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1230    find_metric_name_and_matchers(expr, |name, matchers| {
1231        // Has name, ignore the matchers
1232        if name.is_some() {
1233            return None;
1234        }
1235
1236        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
1237        Some(matchers.find_matchers(METRIC_NAME))
1238    })
1239    .map(|matchers| {
1240        matchers
1241            .into_iter()
1242            .filter(|m| !matches!(m.op, MatchOp::Equal))
1243            .collect::<Vec<_>>()
1244    })
1245}
1246
1247/// Update the `__name__` matchers in expression into special value
1248/// Returns the updated expression.
1249fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1250    match expr {
1251        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1252            update_metric_name_matcher(expr, metric_name)
1253        }
1254        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1255        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1256            update_metric_name_matcher(lhs, metric_name);
1257            update_metric_name_matcher(rhs, metric_name);
1258        }
1259        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1260        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1261            update_metric_name_matcher(expr, metric_name)
1262        }
1263        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1264            if name.is_some() {
1265                return;
1266            }
1267
1268            for m in &mut matchers.matchers {
1269                if m.name == METRIC_NAME {
1270                    m.op = MatchOp::Equal;
1271                    m.value = metric_name.to_string();
1272                }
1273            }
1274        }
1275        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1276            let VectorSelector { name, matchers, .. } = vs;
1277            if name.is_some() {
1278                return;
1279            }
1280
1281            for m in &mut matchers.matchers {
1282                if m.name == METRIC_NAME {
1283                    m.op = MatchOp::Equal;
1284                    m.value = metric_name.to_string();
1285                }
1286            }
1287        }
1288        PromqlExpr::Call(Call { args, .. }) => {
1289            args.args.iter_mut().for_each(|e| {
1290                update_metric_name_matcher(e, metric_name);
1291            });
1292        }
1293        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1294    }
1295}
1296
1297#[derive(Debug, Default, Serialize, Deserialize)]
1298pub struct LabelValueQuery {
1299    start: Option<String>,
1300    end: Option<String>,
1301    lookback: Option<String>,
1302    #[serde(flatten)]
1303    matches: Matches,
1304    db: Option<String>,
1305    limit: Option<usize>,
1306}
1307
1308#[axum_macros::debug_handler]
1309#[tracing::instrument(
1310    skip_all,
1311    fields(protocol = "prometheus", request_type = "label_values_query")
1312)]
1313pub async fn label_values_query(
1314    State(handler): State<PrometheusHandlerRef>,
1315    Path(label_name): Path<String>,
1316    Extension(mut query_ctx): Extension<QueryContext>,
1317    Query(params): Query<LabelValueQuery>,
1318) -> PrometheusJsonResponse {
1319    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1320    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1321    let query_ctx = Arc::new(query_ctx);
1322
1323    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1324        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1325        .start_timer();
1326
1327    if label_name == METRIC_NAME_LABEL {
1328        let catalog_manager = handler.catalog_manager();
1329
1330        let mut table_names = try_call_return_response!(
1331            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1332        );
1333
1334        truncate_results(&mut table_names, params.limit);
1335        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1336    } else if label_name == FIELD_NAME_LABEL {
1337        let field_columns = handle_schema_err!(
1338            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1339        )
1340        .unwrap_or_default();
1341        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1342        field_columns.sort_unstable();
1343        truncate_results(&mut field_columns, params.limit);
1344        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1345    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1346        let catalog_manager = handler.catalog_manager();
1347
1348        let mut schema_names = try_call_return_response!(
1349            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1350        );
1351        truncate_results(&mut schema_names, params.limit);
1352        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1353    }
1354
1355    let queries = params.matches.0;
1356    if queries.is_empty() {
1357        return PrometheusJsonResponse::error(
1358            StatusCode::InvalidArguments,
1359            "match[] parameter is required",
1360        );
1361    }
1362
1363    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1364    let end = params.end.unwrap_or_else(current_time_rfc3339);
1365    let mut label_values = HashSet::new();
1366
1367    let start = try_call_return_response!(
1368        QueryLanguageParser::parse_promql_timestamp(&start)
1369            .context(ParseTimestampSnafu { timestamp: &start })
1370    );
1371    let end = try_call_return_response!(
1372        QueryLanguageParser::parse_promql_timestamp(&end)
1373            .context(ParseTimestampSnafu { timestamp: &end })
1374    );
1375
1376    for query in queries {
1377        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1378        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1379            return PrometheusJsonResponse::error(
1380                StatusCode::InvalidArguments,
1381                "expected vector selector",
1382            );
1383        };
1384        let Some(name) = take_metric_name(&mut vector_selector) else {
1385            return PrometheusJsonResponse::error(
1386                StatusCode::InvalidArguments,
1387                "expected metric name",
1388            );
1389        };
1390        let VectorSelector { matchers, .. } = vector_selector;
1391        // Only use and filter matchers.
1392        let matchers = matchers.matchers;
1393        let result = handler
1394            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1395            .await;
1396        if let Some(result) = handle_schema_err!(result) {
1397            label_values.extend(result.into_iter());
1398        }
1399    }
1400
1401    let mut label_values: Vec<_> = label_values.into_iter().collect();
1402    label_values.sort_unstable();
1403    truncate_results(&mut label_values, params.limit);
1404
1405    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1406}
1407
1408fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1409    if let Some(limit) = limit
1410        && limit > 0
1411        && label_values.len() >= limit
1412    {
1413        label_values.truncate(limit);
1414    }
1415}
1416
1417/// Take metric name from the [VectorSelector].
1418/// It takes the name in the selector or removes the name matcher.
1419fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1420    if let Some(name) = selector.name.take() {
1421        return Some(name);
1422    }
1423
1424    let (pos, matcher) = selector
1425        .matchers
1426        .matchers
1427        .iter()
1428        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1429    let name = matcher.value.clone();
1430    // We need to remove the name matcher to avoid using it as a filter in query.
1431    selector.matchers.matchers.remove(pos);
1432
1433    Some(name)
1434}
1435
1436async fn retrieve_table_names(
1437    query_ctx: &QueryContext,
1438    catalog_manager: CatalogManagerRef,
1439    matches: Vec<String>,
1440) -> Result<Vec<String>> {
1441    let catalog = query_ctx.current_catalog();
1442    let schema = query_ctx.current_schema();
1443
1444    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1445    let mut table_names = Vec::new();
1446
1447    // we only provide very limited support for matcher against __name__
1448    let name_matcher = matches
1449        .first()
1450        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1451        .and_then(|expr| {
1452            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1453                let matchers = vector_selector.matchers.matchers;
1454                for matcher in matchers {
1455                    if matcher.name == METRIC_NAME_LABEL {
1456                        return Some(matcher);
1457                    }
1458                }
1459
1460                None
1461            } else {
1462                None
1463            }
1464        });
1465
1466    while let Some(table) = tables_stream.next().await {
1467        let table = table.context(CatalogSnafu)?;
1468        if !table
1469            .table_info()
1470            .meta
1471            .options
1472            .extra_options
1473            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1474        {
1475            // skip non-prometheus (non-metricengine) tables for __name__ query
1476            continue;
1477        }
1478
1479        let table_name = &table.table_info().name;
1480
1481        if let Some(matcher) = &name_matcher {
1482            match &matcher.op {
1483                MatchOp::Equal => {
1484                    if table_name == &matcher.value {
1485                        table_names.push(table_name.clone());
1486                    }
1487                }
1488                MatchOp::Re(reg) => {
1489                    if reg.is_match(table_name) {
1490                        table_names.push(table_name.clone());
1491                    }
1492                }
1493                _ => {
1494                    // != and !~ are not supported:
1495                    // vector must contains at least one non-empty matcher
1496                    table_names.push(table_name.clone());
1497                }
1498            }
1499        } else {
1500            table_names.push(table_name.clone());
1501        }
1502    }
1503
1504    table_names.sort_unstable();
1505    Ok(table_names)
1506}
1507
1508async fn retrieve_field_names(
1509    query_ctx: &QueryContext,
1510    manager: CatalogManagerRef,
1511    matches: Vec<String>,
1512) -> Result<HashSet<String>> {
1513    let mut field_columns = HashSet::new();
1514    let catalog = query_ctx.current_catalog();
1515    let schema = query_ctx.current_schema();
1516
1517    if matches.is_empty() {
1518        // query all tables if no matcher is provided
1519        while let Some(table) = manager
1520            .tables(catalog, &schema, Some(query_ctx))
1521            .next()
1522            .await
1523        {
1524            let table = table.context(CatalogSnafu)?;
1525            for column in table.field_columns() {
1526                field_columns.insert(column.name);
1527            }
1528        }
1529        return Ok(field_columns);
1530    }
1531
1532    for table_name in matches {
1533        let table = manager
1534            .table(catalog, &schema, &table_name, Some(query_ctx))
1535            .await
1536            .context(CatalogSnafu)?
1537            .with_context(|| TableNotFoundSnafu {
1538                catalog: catalog.to_string(),
1539                schema: schema.clone(),
1540                table: table_name.clone(),
1541            })?;
1542
1543        for column in table.field_columns() {
1544            field_columns.insert(column.name);
1545        }
1546    }
1547    Ok(field_columns)
1548}
1549
1550async fn retrieve_schema_names(
1551    query_ctx: &QueryContext,
1552    catalog_manager: CatalogManagerRef,
1553    matches: Vec<String>,
1554) -> Result<Vec<String>> {
1555    let mut schemas = Vec::new();
1556    let catalog = query_ctx.current_catalog();
1557
1558    let candidate_schemas = catalog_manager
1559        .schema_names(catalog, Some(query_ctx))
1560        .await
1561        .context(CatalogSnafu)?;
1562
1563    for schema in candidate_schemas {
1564        let mut found = true;
1565        for match_item in &matches {
1566            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1567                let exists = catalog_manager
1568                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1569                    .await
1570                    .context(CatalogSnafu)?;
1571                if !exists {
1572                    found = false;
1573                    break;
1574                }
1575            }
1576        }
1577
1578        if found {
1579            schemas.push(schema);
1580        }
1581    }
1582
1583    schemas.sort_unstable();
1584
1585    Ok(schemas)
1586}
1587
1588/// Try to parse and extract the name of referenced metric from the promql query.
1589///
1590/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1591/// Multiple references to the same metric are allowed.
1592fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1593    let promql_expr = promql_parser::parser::parse(query).ok()?;
1594    promql_expr_to_metric_name(&promql_expr)
1595}
1596
1597#[derive(Debug, Default, Serialize, Deserialize)]
1598pub struct SeriesQuery {
1599    start: Option<String>,
1600    end: Option<String>,
1601    lookback: Option<String>,
1602    #[serde(flatten)]
1603    matches: Matches,
1604    db: Option<String>,
1605}
1606
1607#[axum_macros::debug_handler]
1608#[tracing::instrument(
1609    skip_all,
1610    fields(protocol = "prometheus", request_type = "series_query")
1611)]
1612pub async fn series_query(
1613    State(handler): State<PrometheusHandlerRef>,
1614    Query(params): Query<SeriesQuery>,
1615    Extension(mut query_ctx): Extension<QueryContext>,
1616    Form(form_params): Form<SeriesQuery>,
1617) -> PrometheusJsonResponse {
1618    let mut queries: Vec<String> = params.matches.0;
1619    if queries.is_empty() {
1620        queries = form_params.matches.0;
1621    }
1622    if queries.is_empty() {
1623        return PrometheusJsonResponse::error(
1624            StatusCode::Unsupported,
1625            "match[] parameter is required",
1626        );
1627    }
1628    let start = params
1629        .start
1630        .or(form_params.start)
1631        .unwrap_or_else(yesterday_rfc3339);
1632    let end = params
1633        .end
1634        .or(form_params.end)
1635        .unwrap_or_else(current_time_rfc3339);
1636    let lookback = params
1637        .lookback
1638        .or(form_params.lookback)
1639        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1640
1641    // update catalog and schema in query context if necessary
1642    if let Some(db) = &params.db {
1643        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1644        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1645    }
1646    let query_ctx = Arc::new(query_ctx);
1647
1648    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1649        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1650        .start_timer();
1651
1652    let mut series = Vec::new();
1653    let mut merge_map = HashMap::new();
1654    for query in queries {
1655        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1656        let prom_query = PromQuery {
1657            query,
1658            start: start.clone(),
1659            end: end.clone(),
1660            // TODO: find a better value for step
1661            step: DEFAULT_LOOKBACK_STRING.to_string(),
1662            lookback: lookback.clone(),
1663            alias: None,
1664        };
1665        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1666
1667        handle_schema_err!(
1668            retrieve_series_from_query_result(
1669                result,
1670                &mut series,
1671                &query_ctx,
1672                &table_name,
1673                &handler.catalog_manager(),
1674                &mut merge_map,
1675            )
1676            .await
1677        );
1678    }
1679    let merge_map = merge_map
1680        .into_iter()
1681        .map(|(k, v)| (k, Value::from(v)))
1682        .collect();
1683    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1684    resp.resp_metrics = merge_map;
1685    resp
1686}
1687
1688#[derive(Debug, Default, Serialize, Deserialize)]
1689pub struct ParseQuery {
1690    query: Option<String>,
1691    db: Option<String>,
1692}
1693
1694#[axum_macros::debug_handler]
1695#[tracing::instrument(
1696    skip_all,
1697    fields(protocol = "prometheus", request_type = "parse_query")
1698)]
1699pub async fn parse_query(
1700    State(_handler): State<PrometheusHandlerRef>,
1701    Query(params): Query<ParseQuery>,
1702    Extension(_query_ctx): Extension<QueryContext>,
1703    Form(form_params): Form<ParseQuery>,
1704) -> PrometheusJsonResponse {
1705    if let Some(query) = params.query.or(form_params.query) {
1706        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1707        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1708    } else {
1709        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1710    }
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715    use promql_parser::parser::value::ValueType;
1716
1717    use super::*;
1718
1719    struct TestCase {
1720        name: &'static str,
1721        promql: &'static str,
1722        expected_metric: Option<&'static str>,
1723        expected_type: ValueType,
1724        should_error: bool,
1725    }
1726
1727    #[test]
1728    fn test_retrieve_metric_name_and_result_type() {
1729        let test_cases = &[
1730            // Single metric cases
1731            TestCase {
1732                name: "simple metric",
1733                promql: "cpu_usage",
1734                expected_metric: Some("cpu_usage"),
1735                expected_type: ValueType::Vector,
1736                should_error: false,
1737            },
1738            TestCase {
1739                name: "metric with selector",
1740                promql: r#"cpu_usage{instance="localhost"}"#,
1741                expected_metric: Some("cpu_usage"),
1742                expected_type: ValueType::Vector,
1743                should_error: false,
1744            },
1745            TestCase {
1746                name: "metric with range selector",
1747                promql: "cpu_usage[5m]",
1748                expected_metric: Some("cpu_usage"),
1749                expected_type: ValueType::Matrix,
1750                should_error: false,
1751            },
1752            TestCase {
1753                name: "metric with __name__ matcher",
1754                promql: r#"{__name__="cpu_usage"}"#,
1755                expected_metric: Some("cpu_usage"),
1756                expected_type: ValueType::Vector,
1757                should_error: false,
1758            },
1759            TestCase {
1760                name: "metric with unary operator",
1761                promql: "-cpu_usage",
1762                expected_metric: None,
1763                expected_type: ValueType::Vector,
1764                should_error: false,
1765            },
1766            // Aggregation and function cases
1767            TestCase {
1768                name: "metric with aggregation",
1769                promql: "sum(cpu_usage)",
1770                expected_metric: Some("cpu_usage"),
1771                expected_type: ValueType::Vector,
1772                should_error: false,
1773            },
1774            TestCase {
1775                name: "complex aggregation",
1776                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1777                expected_metric: None,
1778                expected_type: ValueType::Vector,
1779                should_error: false,
1780            },
1781            TestCase {
1782                name: "complex aggregation",
1783                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1784                expected_metric: Some("cpu_usage"),
1785                expected_type: ValueType::Vector,
1786                should_error: false,
1787            },
1788            TestCase {
1789                name: "complex aggregation",
1790                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1791                expected_metric: Some("cpu_usage"),
1792                expected_type: ValueType::Vector,
1793                should_error: false,
1794            },
1795            // Same metric binary operations
1796            TestCase {
1797                name: "same metric addition",
1798                promql: "cpu_usage + cpu_usage",
1799                expected_metric: None,
1800                expected_type: ValueType::Vector,
1801                should_error: false,
1802            },
1803            TestCase {
1804                name: "metric with scalar addition",
1805                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1806                expected_metric: None,
1807                expected_type: ValueType::Vector,
1808                should_error: false,
1809            },
1810            // Multiple metrics cases
1811            TestCase {
1812                name: "different metrics addition",
1813                promql: "cpu_usage + memory_usage",
1814                expected_metric: None,
1815                expected_type: ValueType::Vector,
1816                should_error: false,
1817            },
1818            TestCase {
1819                name: "different metrics subtraction",
1820                promql: "network_in - network_out",
1821                expected_metric: None,
1822                expected_type: ValueType::Vector,
1823                should_error: false,
1824            },
1825            // Unless operator cases
1826            TestCase {
1827                name: "unless with different metrics",
1828                promql: "cpu_usage unless memory_usage",
1829                expected_metric: Some("cpu_usage"),
1830                expected_type: ValueType::Vector,
1831                should_error: false,
1832            },
1833            TestCase {
1834                name: "unless with same metric",
1835                promql: "cpu_usage unless cpu_usage",
1836                expected_metric: Some("cpu_usage"),
1837                expected_type: ValueType::Vector,
1838                should_error: false,
1839            },
1840            // Subquery cases
1841            TestCase {
1842                name: "basic subquery",
1843                promql: "cpu_usage[5m:1m]",
1844                expected_metric: Some("cpu_usage"),
1845                expected_type: ValueType::Matrix,
1846                should_error: false,
1847            },
1848            TestCase {
1849                name: "subquery with multiple metrics",
1850                promql: "(cpu_usage + memory_usage)[5m:1m]",
1851                expected_metric: None,
1852                expected_type: ValueType::Matrix,
1853                should_error: false,
1854            },
1855            // Literal values
1856            TestCase {
1857                name: "scalar value",
1858                promql: "42",
1859                expected_metric: None,
1860                expected_type: ValueType::Scalar,
1861                should_error: false,
1862            },
1863            TestCase {
1864                name: "string literal",
1865                promql: r#""hello world""#,
1866                expected_metric: None,
1867                expected_type: ValueType::String,
1868                should_error: false,
1869            },
1870            // Error cases
1871            TestCase {
1872                name: "invalid syntax",
1873                promql: "cpu_usage{invalid=",
1874                expected_metric: None,
1875                expected_type: ValueType::Vector,
1876                should_error: true,
1877            },
1878            TestCase {
1879                name: "empty query",
1880                promql: "",
1881                expected_metric: None,
1882                expected_type: ValueType::Vector,
1883                should_error: true,
1884            },
1885            TestCase {
1886                name: "malformed brackets",
1887                promql: "cpu_usage[5m",
1888                expected_metric: None,
1889                expected_type: ValueType::Vector,
1890                should_error: true,
1891            },
1892        ];
1893
1894        for test_case in test_cases {
1895            let result = retrieve_metric_name_and_result_type(test_case.promql);
1896
1897            if test_case.should_error {
1898                assert!(
1899                    result.is_err(),
1900                    "Test '{}' should have failed but succeeded with: {:?}",
1901                    test_case.name,
1902                    result
1903                );
1904            } else {
1905                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1906                    panic!(
1907                        "Test '{}' should have succeeded but failed with error: {}",
1908                        test_case.name, e
1909                    )
1910                });
1911
1912                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1913                assert_eq!(
1914                    metric_name, expected_metric_name,
1915                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1916                    test_case.name, expected_metric_name, metric_name
1917                );
1918
1919                assert_eq!(
1920                    value_type, test_case.expected_type,
1921                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1922                    test_case.name, test_case.expected_type, value_type
1923                );
1924            }
1925        }
1926    }
1927}