Skip to main content

servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16
17use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54    SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68    CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, InvalidQuerySnafu,
69    NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, UnexpectedResultSnafu,
70};
71use crate::http::header::collect_plan_metrics;
72use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
73use crate::prometheus_handler::PrometheusHandlerRef;
74
75/// For [ValueType::Vector] result type
76#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
77pub struct PromSeriesVector {
78    pub metric: BTreeMap<String, String>,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub value: Option<(f64, String)>,
81}
82
83/// For [ValueType::Matrix] result type
84#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
85pub struct PromSeriesMatrix {
86    pub metric: BTreeMap<String, String>,
87    pub values: Vec<(f64, String)>,
88}
89
90/// Variants corresponding to [ValueType]
91#[derive(Debug, Serialize, Deserialize, PartialEq)]
92#[serde(untagged)]
93pub enum PromQueryResult {
94    Matrix(Vec<PromSeriesMatrix>),
95    Vector(Vec<PromSeriesVector>),
96    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
97    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98}
99
100impl Default for PromQueryResult {
101    fn default() -> Self {
102        PromQueryResult::Matrix(Default::default())
103    }
104}
105
106#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
107pub struct PromData {
108    #[serde(rename = "resultType")]
109    pub result_type: String,
110    pub result: PromQueryResult,
111}
112
113/// A "holder" for the reference([Arc]) to a column name,
114/// to help avoiding cloning [String]s when used as a [HashMap] key.
115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
116pub struct Column(Arc<String>);
117
118impl From<&str> for Column {
119    fn from(s: &str) -> Self {
120        Self(Arc::new(s.to_string()))
121    }
122}
123
124#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
125#[serde(untagged)]
126pub enum PrometheusResponse {
127    PromData(PromData),
128    Labels(Vec<String>),
129    Series(Vec<HashMap<Column, String>>),
130    LabelValues(Vec<String>),
131    FormatQuery(String),
132    BuildInfo(OwnedBuildInfo),
133    #[serde(skip_deserializing)]
134    ParseResult(promql_parser::parser::Expr),
135    #[default]
136    None,
137}
138
139impl PrometheusResponse {
140    /// Append the other [`PrometheusResponse]`.
141    /// # NOTE
142    ///   Only append matrix and vector results, otherwise just ignore the other response.
143    fn append(&mut self, other: PrometheusResponse) {
144        match (self, other) {
145            (
146                PrometheusResponse::PromData(PromData {
147                    result: PromQueryResult::Matrix(lhs),
148                    ..
149                }),
150                PrometheusResponse::PromData(PromData {
151                    result: PromQueryResult::Matrix(rhs),
152                    ..
153                }),
154            ) => {
155                lhs.extend(rhs);
156            }
157
158            (
159                PrometheusResponse::PromData(PromData {
160                    result: PromQueryResult::Vector(lhs),
161                    ..
162                }),
163                PrometheusResponse::PromData(PromData {
164                    result: PromQueryResult::Vector(rhs),
165                    ..
166                }),
167            ) => {
168                lhs.extend(rhs);
169            }
170            _ => {
171                // TODO(dennis): process other cases?
172            }
173        }
174    }
175
176    pub fn is_none(&self) -> bool {
177        matches!(self, PrometheusResponse::None)
178    }
179}
180
181#[derive(Debug, Default, Serialize, Deserialize)]
182pub struct FormatQuery {
183    query: Option<String>,
184}
185
186#[axum_macros::debug_handler]
187#[tracing::instrument(
188    skip_all,
189    fields(protocol = "prometheus", request_type = "format_query")
190)]
191pub async fn format_query(
192    State(_handler): State<PrometheusHandlerRef>,
193    Query(params): Query<InstantQuery>,
194    Extension(_query_ctx): Extension<QueryContext>,
195    Form(form_params): Form<InstantQuery>,
196) -> PrometheusJsonResponse {
197    let query = params.query.or(form_params.query).unwrap_or_default();
198    match promql_parser::parser::parse(&query) {
199        Ok(expr) => {
200            let pretty = expr.prettify();
201            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
202        }
203        Err(reason) => {
204            let err = InvalidQuerySnafu { reason }.build();
205            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
206        }
207    }
208}
209
210#[derive(Debug, Default, Serialize, Deserialize)]
211pub struct BuildInfoQuery {}
212
213#[axum_macros::debug_handler]
214#[tracing::instrument(
215    skip_all,
216    fields(protocol = "prometheus", request_type = "build_info_query")
217)]
218pub async fn build_info_query() -> PrometheusJsonResponse {
219    let build_info = common_version::build_info().clone();
220    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
221}
222
223#[derive(Debug, Default, Serialize, Deserialize)]
224pub struct InstantQuery {
225    query: Option<String>,
226    lookback: Option<String>,
227    time: Option<String>,
228    timeout: Option<String>,
229    db: Option<String>,
230}
231
232/// Helper macro which try to evaluate the expression and return its results.
233/// If the evaluation fails, return a `PrometheusJsonResponse` early.
234macro_rules! try_call_return_response {
235    ($handle: expr) => {
236        match $handle {
237            Ok(res) => res,
238            Err(err) => {
239                let msg = err.to_string();
240                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
241            }
242        }
243    };
244}
245
246#[axum_macros::debug_handler]
247#[tracing::instrument(
248    skip_all,
249    fields(protocol = "prometheus", request_type = "instant_query")
250)]
251pub async fn instant_query(
252    State(handler): State<PrometheusHandlerRef>,
253    Query(params): Query<InstantQuery>,
254    Extension(mut query_ctx): Extension<QueryContext>,
255    Form(form_params): Form<InstantQuery>,
256) -> PrometheusJsonResponse {
257    // Extract time from query string, or use current server time if not specified.
258    let time = params
259        .time
260        .or(form_params.time)
261        .unwrap_or_else(current_time_rfc3339);
262    let prom_query = PromQuery {
263        query: params.query.or(form_params.query).unwrap_or_default(),
264        start: time.clone(),
265        end: time,
266        step: "1s".to_string(),
267        lookback: params
268            .lookback
269            .or(form_params.lookback)
270            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
271        alias: None,
272    };
273
274    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
275
276    // update catalog and schema in query context if necessary
277    if let Some(db) = &params.db {
278        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
279        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
280    }
281    let query_ctx = Arc::new(query_ctx);
282
283    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
284        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
285        .start_timer();
286
287    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
288        && !name_matchers.is_empty()
289    {
290        debug!("Find metric name matchers: {:?}", name_matchers);
291
292        let metric_names =
293            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
294
295        debug!("Find metric names: {:?}", metric_names);
296
297        if metric_names.is_empty() {
298            let result_type = promql_expr.value_type();
299
300            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
301                result_type: result_type.to_string(),
302                ..Default::default()
303            }));
304        }
305
306        let responses = join_all(metric_names.into_iter().map(|metric| {
307            let mut prom_query = prom_query.clone();
308            let mut promql_expr = promql_expr.clone();
309            let query_ctx = query_ctx.clone();
310            let handler = handler.clone();
311
312            async move {
313                update_metric_name_matcher(&mut promql_expr, &metric);
314                let new_query = promql_expr.to_string();
315                debug!(
316                    "Updated promql, before: {}, after: {}",
317                    &prom_query.query, new_query
318                );
319                prom_query.query = new_query;
320
321                do_instant_query(&handler, &prom_query, query_ctx).await
322            }
323        }))
324        .await;
325
326        responses
327            .into_iter()
328            .reduce(|mut acc, resp| {
329                acc.data.append(resp.data);
330                acc
331            })
332            .unwrap()
333    } else {
334        do_instant_query(&handler, &prom_query, query_ctx).await
335    }
336}
337
338/// Executes a single instant query and returns response
339async fn do_instant_query(
340    handler: &PrometheusHandlerRef,
341    prom_query: &PromQuery,
342    query_ctx: QueryContextRef,
343) -> PrometheusJsonResponse {
344    let result = handler.do_query(prom_query, query_ctx).await;
345    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
346        Ok((metric_name, result_type)) => (metric_name, result_type),
347        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
348    };
349    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
350}
351
352#[derive(Debug, Default, Serialize, Deserialize)]
353pub struct RangeQuery {
354    query: Option<String>,
355    start: Option<String>,
356    end: Option<String>,
357    step: Option<String>,
358    lookback: Option<String>,
359    timeout: Option<String>,
360    db: Option<String>,
361}
362
363#[axum_macros::debug_handler]
364#[tracing::instrument(
365    skip_all,
366    fields(protocol = "prometheus", request_type = "range_query")
367)]
368pub async fn range_query(
369    State(handler): State<PrometheusHandlerRef>,
370    Query(params): Query<RangeQuery>,
371    Extension(mut query_ctx): Extension<QueryContext>,
372    Form(form_params): Form<RangeQuery>,
373) -> PrometheusJsonResponse {
374    let prom_query = PromQuery {
375        query: params.query.or(form_params.query).unwrap_or_default(),
376        start: params.start.or(form_params.start).unwrap_or_default(),
377        end: params.end.or(form_params.end).unwrap_or_default(),
378        step: params.step.or(form_params.step).unwrap_or_default(),
379        lookback: params
380            .lookback
381            .or(form_params.lookback)
382            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
383        alias: None,
384    };
385
386    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
387
388    // update catalog and schema in query context if necessary
389    if let Some(db) = &params.db {
390        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
391        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
392    }
393    let query_ctx = Arc::new(query_ctx);
394    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
395        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
396        .start_timer();
397
398    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
399        && !name_matchers.is_empty()
400    {
401        debug!("Find metric name matchers: {:?}", name_matchers);
402
403        let metric_names =
404            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
405
406        debug!("Find metric names: {:?}", metric_names);
407
408        if metric_names.is_empty() {
409            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
410                result_type: ValueType::Matrix.to_string(),
411                ..Default::default()
412            }));
413        }
414
415        let responses = join_all(metric_names.into_iter().map(|metric| {
416            let mut prom_query = prom_query.clone();
417            let mut promql_expr = promql_expr.clone();
418            let query_ctx = query_ctx.clone();
419            let handler = handler.clone();
420
421            async move {
422                update_metric_name_matcher(&mut promql_expr, &metric);
423                let new_query = promql_expr.to_string();
424                debug!(
425                    "Updated promql, before: {}, after: {}",
426                    &prom_query.query, new_query
427                );
428                prom_query.query = new_query;
429
430                do_range_query(&handler, &prom_query, query_ctx).await
431            }
432        }))
433        .await;
434
435        // Safety: at least one responses, checked above
436        responses
437            .into_iter()
438            .reduce(|mut acc, resp| {
439                acc.data.append(resp.data);
440                acc
441            })
442            .unwrap()
443    } else {
444        do_range_query(&handler, &prom_query, query_ctx).await
445    }
446}
447
448/// Executes a single range query and returns response
449async fn do_range_query(
450    handler: &PrometheusHandlerRef,
451    prom_query: &PromQuery,
452    query_ctx: QueryContextRef,
453) -> PrometheusJsonResponse {
454    let result = handler.do_query(prom_query, query_ctx).await;
455    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
456        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
457        Ok((metric_name, _)) => metric_name,
458    };
459    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
460}
461
462#[derive(Debug, Default, Serialize)]
463struct Matches(Vec<String>);
464
465#[derive(Debug, Default, Serialize, Deserialize)]
466pub struct LabelsQuery {
467    start: Option<String>,
468    end: Option<String>,
469    lookback: Option<String>,
470    #[serde(flatten)]
471    matches: Matches,
472    db: Option<String>,
473}
474
475// Custom Deserialize method to support parsing repeated match[]
476impl<'de> Deserialize<'de> for Matches {
477    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
478    where
479        D: de::Deserializer<'de>,
480    {
481        struct MatchesVisitor;
482
483        impl<'d> Visitor<'d> for MatchesVisitor {
484            type Value = Vec<String>;
485
486            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
487                formatter.write_str("a string")
488            }
489
490            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
491            where
492                M: MapAccess<'d>,
493            {
494                let mut matches = Vec::new();
495                while let Some((key, value)) = access.next_entry::<String, String>()? {
496                    if key == "match[]" {
497                        matches.push(value);
498                    }
499                }
500                Ok(matches)
501            }
502        }
503        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
504    }
505}
506
507/// Handles schema errors, transforming a Result into an Option.
508/// - If the input is `Ok(v)`, returns `Some(v)`
509/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
510///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
511/// - If the input is `Err(err)` with any other error code, directly returns a
512///   `PrometheusJsonResponse::error`.
513macro_rules! handle_schema_err {
514    ($result:expr) => {
515        match $result {
516            Ok(v) => Some(v),
517            Err(err) => {
518                if err.status_code() == StatusCode::TableNotFound
519                    || err.status_code() == StatusCode::TableColumnNotFound
520                {
521                    // Prometheus won't report error if querying nonexist label and metric
522                    None
523                } else {
524                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
525                }
526            }
527        }
528    };
529}
530
531#[axum_macros::debug_handler]
532#[tracing::instrument(
533    skip_all,
534    fields(protocol = "prometheus", request_type = "labels_query")
535)]
536pub async fn labels_query(
537    State(handler): State<PrometheusHandlerRef>,
538    Query(params): Query<LabelsQuery>,
539    Extension(mut query_ctx): Extension<QueryContext>,
540    Form(form_params): Form<LabelsQuery>,
541) -> PrometheusJsonResponse {
542    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
543    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
544    let query_ctx = Arc::new(query_ctx);
545
546    let mut queries = params.matches.0;
547    if queries.is_empty() {
548        queries = form_params.matches.0;
549    }
550
551    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
552        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
553        .start_timer();
554
555    // Fetch all tag columns. It will be used as white-list for tag names.
556    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
557    {
558        Ok(labels) => labels,
559        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
560    };
561    // insert the special metric name label
562    let _ = labels.insert(METRIC_NAME.to_string());
563
564    // Fetch all columns if no query matcher is provided
565    if queries.is_empty() {
566        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
567        labels_vec.sort_unstable();
568        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
569    }
570
571    // Otherwise, run queries and extract column name from result set.
572    let start = params
573        .start
574        .or(form_params.start)
575        .unwrap_or_else(yesterday_rfc3339);
576    let end = params
577        .end
578        .or(form_params.end)
579        .unwrap_or_else(current_time_rfc3339);
580    let lookback = params
581        .lookback
582        .or(form_params.lookback)
583        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
584
585    let mut fetched_labels = HashSet::new();
586    let _ = fetched_labels.insert(METRIC_NAME.to_string());
587
588    let mut merge_map = HashMap::new();
589    for query in queries {
590        let prom_query = PromQuery {
591            query,
592            start: start.clone(),
593            end: end.clone(),
594            step: DEFAULT_LOOKBACK_STRING.to_string(),
595            lookback: lookback.clone(),
596            alias: None,
597        };
598
599        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
600        handle_schema_err!(
601            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
602                .await
603        );
604    }
605
606    // intersect `fetched_labels` with `labels` to filter out non-tag columns
607    fetched_labels.retain(|l| labels.contains(l));
608    let _ = labels.insert(METRIC_NAME.to_string());
609
610    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
611    sorted_labels.sort();
612    let merge_map = merge_map
613        .into_iter()
614        .map(|(k, v)| (k, Value::from(v)))
615        .collect();
616    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
617    resp.resp_metrics = merge_map;
618    resp
619}
620
621/// Get all tag column name of the given schema
622async fn get_all_column_names(
623    catalog: &str,
624    schema: &str,
625    manager: &CatalogManagerRef,
626) -> std::result::Result<HashSet<String>, catalog::error::Error> {
627    let table_names = manager.table_names(catalog, schema, None).await?;
628
629    let mut labels = HashSet::new();
630    for table_name in table_names {
631        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
632            continue;
633        };
634        for column in table.primary_key_columns() {
635            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
636                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
637            {
638                labels.insert(column.name);
639            }
640        }
641    }
642
643    Ok(labels)
644}
645
646async fn retrieve_series_from_query_result(
647    result: Result<Output>,
648    series: &mut Vec<HashMap<Column, String>>,
649    query_ctx: &QueryContext,
650    table_name: &str,
651    manager: &CatalogManagerRef,
652    metrics: &mut HashMap<String, u64>,
653) -> Result<()> {
654    let result = result?;
655
656    // fetch tag list
657    let table = manager
658        .table(
659            query_ctx.current_catalog(),
660            &query_ctx.current_schema(),
661            table_name,
662            Some(query_ctx),
663        )
664        .await?
665        .with_context(|| TableNotFoundSnafu {
666            catalog: query_ctx.current_catalog(),
667            schema: query_ctx.current_schema(),
668            table: table_name,
669        })?;
670    let tag_columns = table
671        .primary_key_columns()
672        .map(|c| c.name)
673        .collect::<HashSet<_>>();
674
675    match result.data {
676        OutputData::RecordBatches(batches) => {
677            record_batches_to_series(batches, series, table_name, &tag_columns)
678        }
679        OutputData::Stream(stream) => {
680            let batches = RecordBatches::try_collect(stream)
681                .await
682                .context(CollectRecordbatchSnafu)?;
683            record_batches_to_series(batches, series, table_name, &tag_columns)
684        }
685        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
686            reason: "expected data result, but got affected rows".to_string(),
687            location: Location::default(),
688        }),
689    }?;
690
691    if let Some(ref plan) = result.meta.plan {
692        collect_plan_metrics(plan, &mut [metrics]);
693    }
694    Ok(())
695}
696
697/// Retrieve labels name from query result
698async fn retrieve_labels_name_from_query_result(
699    result: Result<Output>,
700    labels: &mut HashSet<String>,
701    metrics: &mut HashMap<String, u64>,
702) -> Result<()> {
703    let result = result?;
704    match result.data {
705        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
706        OutputData::Stream(stream) => {
707            let batches = RecordBatches::try_collect(stream)
708                .await
709                .context(CollectRecordbatchSnafu)?;
710            record_batches_to_labels_name(batches, labels)
711        }
712        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
713            reason: "expected data result, but got affected rows".to_string(),
714        }
715        .fail(),
716    }?;
717    if let Some(ref plan) = result.meta.plan {
718        collect_plan_metrics(plan, &mut [metrics]);
719    }
720    Ok(())
721}
722
723fn record_batches_to_series(
724    batches: RecordBatches,
725    series: &mut Vec<HashMap<Column, String>>,
726    table_name: &str,
727    tag_columns: &HashSet<String>,
728) -> Result<()> {
729    for batch in batches.iter() {
730        // project record batch to only contains tag columns
731        let projection = batch
732            .schema
733            .column_schemas()
734            .iter()
735            .enumerate()
736            .filter_map(|(idx, col)| {
737                if tag_columns.contains(&col.name) {
738                    Some(idx)
739                } else {
740                    None
741                }
742            })
743            .collect::<Vec<_>>();
744        let batch = batch
745            .try_project(&projection)
746            .context(CollectRecordbatchSnafu)?;
747
748        let mut writer = RowWriter::new(&batch.schema, table_name);
749        writer.write(batch, series)?;
750    }
751    Ok(())
752}
753
754/// Writer from a row in the record batch to a Prometheus time series:
755///
756/// `{__name__="<metric name>", <label name>="<label value>", ...}`
757///
758/// The metrics name is the table name; label names are the column names and
759/// the label values are the corresponding row values (all are converted to strings).
760struct RowWriter {
761    /// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
762    /// and label names, waiting to be filled by row values afterward.
763    template: HashMap<Column, Option<String>>,
764    /// The current filling row.
765    current: Option<HashMap<Column, Option<String>>>,
766}
767
768impl RowWriter {
769    fn new(schema: &SchemaRef, table: &str) -> Self {
770        let mut template = schema
771            .column_schemas()
772            .iter()
773            .map(|x| (x.name.as_str().into(), None))
774            .collect::<HashMap<Column, Option<String>>>();
775        template.insert("__name__".into(), Some(table.to_string()));
776        Self {
777            template,
778            current: None,
779        }
780    }
781
782    fn insert(&mut self, column: ColumnRef, value: impl ToString) {
783        let current = self.current.get_or_insert_with(|| self.template.clone());
784        match current.get_mut(&column as &dyn AsColumnRef) {
785            Some(x) => {
786                let _ = x.insert(value.to_string());
787            }
788            None => {
789                let _ = current.insert(column.0.into(), Some(value.to_string()));
790            }
791        }
792    }
793
794    fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
795        let column_name = column_schema.name.as_str().into();
796
797        if column_schema.data_type.is_json() {
798            let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
799            self.insert(column_name, s);
800        } else {
801            let hex = bytes
802                .iter()
803                .map(|b| format!("{b:02x}"))
804                .collect::<Vec<String>>()
805                .join("");
806            self.insert(column_name, hex);
807        }
808        Ok(())
809    }
810
811    fn finish(&mut self) -> HashMap<Column, String> {
812        let Some(current) = self.current.take() else {
813            return HashMap::new();
814        };
815        current
816            .into_iter()
817            .filter_map(|(k, v)| v.map(|v| (k, v)))
818            .collect()
819    }
820
821    fn write(
822        &mut self,
823        record_batch: RecordBatch,
824        series: &mut Vec<HashMap<Column, String>>,
825    ) -> Result<()> {
826        let schema = record_batch.schema.clone();
827        let record_batch = record_batch.into_df_record_batch();
828        for i in 0..record_batch.num_rows() {
829            for (j, array) in record_batch.columns().iter().enumerate() {
830                let column = schema.column_name_by_index(j).into();
831
832                if array.is_null(i) {
833                    self.insert(column, "Null");
834                    continue;
835                }
836
837                match array.data_type() {
838                    DataType::Null => {
839                        self.insert(column, "Null");
840                    }
841                    DataType::Boolean => {
842                        let array = array.as_boolean();
843                        let v = array.value(i);
844                        self.insert(column, v);
845                    }
846                    DataType::UInt8 => {
847                        let array = array.as_primitive::<UInt8Type>();
848                        let v = array.value(i);
849                        self.insert(column, v);
850                    }
851                    DataType::UInt16 => {
852                        let array = array.as_primitive::<UInt16Type>();
853                        let v = array.value(i);
854                        self.insert(column, v);
855                    }
856                    DataType::UInt32 => {
857                        let array = array.as_primitive::<UInt32Type>();
858                        let v = array.value(i);
859                        self.insert(column, v);
860                    }
861                    DataType::UInt64 => {
862                        let array = array.as_primitive::<UInt64Type>();
863                        let v = array.value(i);
864                        self.insert(column, v);
865                    }
866                    DataType::Int8 => {
867                        let array = array.as_primitive::<Int8Type>();
868                        let v = array.value(i);
869                        self.insert(column, v);
870                    }
871                    DataType::Int16 => {
872                        let array = array.as_primitive::<Int16Type>();
873                        let v = array.value(i);
874                        self.insert(column, v);
875                    }
876                    DataType::Int32 => {
877                        let array = array.as_primitive::<Int32Type>();
878                        let v = array.value(i);
879                        self.insert(column, v);
880                    }
881                    DataType::Int64 => {
882                        let array = array.as_primitive::<Int64Type>();
883                        let v = array.value(i);
884                        self.insert(column, v);
885                    }
886                    DataType::Float32 => {
887                        let array = array.as_primitive::<Float32Type>();
888                        let v = array.value(i);
889                        self.insert(column, v);
890                    }
891                    DataType::Float64 => {
892                        let array = array.as_primitive::<Float64Type>();
893                        let v = array.value(i);
894                        self.insert(column, v);
895                    }
896                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
897                        let v = datatypes::arrow_array::string_array_value(array, i);
898                        self.insert(column, v);
899                    }
900                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
901                        let v = datatypes::arrow_array::binary_array_value(array, i);
902                        let column_schema = &schema.column_schemas()[j];
903                        self.insert_bytes(column_schema, v)?;
904                    }
905                    DataType::Date32 => {
906                        let array = array.as_primitive::<Date32Type>();
907                        let v = Date::new(array.value(i));
908                        self.insert(column, v);
909                    }
910                    DataType::Date64 => {
911                        let array = array.as_primitive::<Date64Type>();
912                        // `Date64` values are milliseconds representation of `Date32` values,
913                        // according to its specification. So we convert the `Date64` value here to
914                        // the `Date32` value to process them unified.
915                        let v = Date::new((array.value(i) / 86_400_000) as i32);
916                        self.insert(column, v);
917                    }
918                    DataType::Timestamp(_, _) => {
919                        let v = datatypes::arrow_array::timestamp_array_value(array, i);
920                        self.insert(column, v.to_iso8601_string());
921                    }
922                    DataType::Time32(_) | DataType::Time64(_) => {
923                        let v = datatypes::arrow_array::time_array_value(array, i);
924                        self.insert(column, v.to_iso8601_string());
925                    }
926                    DataType::Interval(interval_unit) => match interval_unit {
927                        IntervalUnit::YearMonth => {
928                            let array = array.as_primitive::<IntervalYearMonthType>();
929                            let v: IntervalYearMonth = array.value(i).into();
930                            self.insert(column, v.to_iso8601_string());
931                        }
932                        IntervalUnit::DayTime => {
933                            let array = array.as_primitive::<IntervalDayTimeType>();
934                            let v: IntervalDayTime = array.value(i).into();
935                            self.insert(column, v.to_iso8601_string());
936                        }
937                        IntervalUnit::MonthDayNano => {
938                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
939                            let v: IntervalMonthDayNano = array.value(i).into();
940                            self.insert(column, v.to_iso8601_string());
941                        }
942                    },
943                    DataType::Duration(_) => {
944                        let d = datatypes::arrow_array::duration_array_value(array, i);
945                        self.insert(column, d);
946                    }
947                    DataType::List(_) => {
948                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
949                        self.insert(column, v);
950                    }
951                    DataType::Struct(_) => {
952                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
953                        self.insert(column, v);
954                    }
955                    DataType::Decimal128(precision, scale) => {
956                        let array = array.as_primitive::<Decimal128Type>();
957                        let v = Decimal128::new(array.value(i), *precision, *scale);
958                        self.insert(column, v);
959                    }
960                    _ => {
961                        return NotSupportedSnafu {
962                            feat: format!("convert {} to http value", array.data_type()),
963                        }
964                        .fail();
965                    }
966                }
967            }
968
969            series.push(self.finish())
970        }
971        Ok(())
972    }
973}
974
975#[derive(Debug, Clone, Copy, PartialEq)]
976struct ColumnRef<'a>(&'a str);
977
978impl<'a> From<&'a str> for ColumnRef<'a> {
979    fn from(s: &'a str) -> Self {
980        Self(s)
981    }
982}
983
984trait AsColumnRef {
985    fn as_ref(&self) -> ColumnRef<'_>;
986}
987
988impl AsColumnRef for Column {
989    fn as_ref(&self) -> ColumnRef<'_> {
990        self.0.as_str().into()
991    }
992}
993
994impl AsColumnRef for ColumnRef<'_> {
995    fn as_ref(&self) -> ColumnRef<'_> {
996        *self
997    }
998}
999
1000impl<'a> PartialEq for dyn AsColumnRef + 'a {
1001    fn eq(&self, other: &Self) -> bool {
1002        self.as_ref() == other.as_ref()
1003    }
1004}
1005
1006impl<'a> Eq for dyn AsColumnRef + 'a {}
1007
1008impl<'a> Hash for dyn AsColumnRef + 'a {
1009    fn hash<H: Hasher>(&self, state: &mut H) {
1010        self.as_ref().0.hash(state);
1011    }
1012}
1013
1014impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1015    fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1016        self
1017    }
1018}
1019
1020/// Retrieve labels name from record batches
1021fn record_batches_to_labels_name(
1022    batches: RecordBatches,
1023    labels: &mut HashSet<String>,
1024) -> Result<()> {
1025    let mut column_indices = Vec::new();
1026    let mut field_column_indices = Vec::new();
1027    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1028        if let ConcreteDataType::Float64(_) = column.data_type {
1029            field_column_indices.push(i);
1030        }
1031        column_indices.push(i);
1032    }
1033
1034    if field_column_indices.is_empty() {
1035        return Err(Error::Internal {
1036            err_msg: "no value column found".to_string(),
1037        });
1038    }
1039
1040    for batch in batches.iter() {
1041        let names = column_indices
1042            .iter()
1043            .map(|c| batches.schema().column_name_by_index(*c).to_string())
1044            .collect::<Vec<_>>();
1045
1046        let field_columns = field_column_indices
1047            .iter()
1048            .map(|i| {
1049                let column = batch.column(*i);
1050                column.as_primitive::<Float64Type>()
1051            })
1052            .collect::<Vec<_>>();
1053
1054        for row_index in 0..batch.num_rows() {
1055            // if all field columns are null, skip this row
1056            if field_columns.iter().all(|c| c.is_null(row_index)) {
1057                continue;
1058            }
1059
1060            // if a field is not null, record the tag name and return
1061            names.iter().for_each(|name| {
1062                let _ = labels.insert(name.clone());
1063            });
1064            return Ok(());
1065        }
1066    }
1067    Ok(())
1068}
1069
1070pub(crate) fn retrieve_metric_name_and_result_type(
1071    promql: &str,
1072) -> Result<(Option<String>, ValueType)> {
1073    let promql_expr = promql_parser::parser::parse(promql)
1074        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1075    let metric_name = promql_expr_to_metric_name(&promql_expr);
1076    let result_type = promql_expr.value_type();
1077
1078    Ok((metric_name, result_type))
1079}
1080
1081/// Tries to get catalog and schema from an optional db param. And retrieves
1082/// them from [QueryContext] if they don't present.
1083pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1084    if let Some(db) = db {
1085        parse_catalog_and_schema_from_db_string(db)
1086    } else {
1087        (
1088            ctx.current_catalog().to_string(),
1089            ctx.current_schema().clone(),
1090        )
1091    }
1092}
1093
1094/// Update catalog and schema in [QueryContext] if necessary.
1095pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1096    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1097        ctx.set_current_catalog(catalog);
1098        ctx.set_current_schema(schema);
1099    }
1100}
1101
1102fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1103    let mut metric_names = HashSet::new();
1104    collect_metric_names(expr, &mut metric_names);
1105
1106    // Return the metric name only if there's exactly one unique metric name
1107    if metric_names.len() == 1 {
1108        metric_names.into_iter().next()
1109    } else {
1110        None
1111    }
1112}
1113
1114/// Recursively collect all metric names from a PromQL expression
1115fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1116    match expr {
1117        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1118            match modifier {
1119                Some(LabelModifier::Include(labels))
1120                    if !labels.labels.contains(&METRIC_NAME.to_string()) =>
1121                {
1122                    metric_names.clear();
1123                    return;
1124                }
1125                Some(LabelModifier::Exclude(labels))
1126                    if labels.labels.contains(&METRIC_NAME.to_string()) =>
1127                {
1128                    metric_names.clear();
1129                    return;
1130                }
1131                _ => {}
1132            }
1133            collect_metric_names(expr, metric_names)
1134        }
1135        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1136        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1137            if matches!(
1138                op.id(),
1139                token::T_LAND // INTERSECT
1140                    | token::T_LOR // UNION
1141                    | token::T_LUNLESS // EXCEPT
1142            ) {
1143                collect_metric_names(lhs, metric_names)
1144            } else {
1145                metric_names.clear()
1146            }
1147        }
1148        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1149        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1150        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1151            if let Some(name) = name {
1152                metric_names.insert(name.clone());
1153            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1154                metric_names.insert(matcher.value);
1155            }
1156        }
1157        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1158            let VectorSelector { name, matchers, .. } = vs;
1159            if let Some(name) = name {
1160                metric_names.insert(name.clone());
1161            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1162                metric_names.insert(matcher.value);
1163            }
1164        }
1165        PromqlExpr::Call(Call { args, .. }) => {
1166            args.args
1167                .iter()
1168                .for_each(|e| collect_metric_names(e, metric_names));
1169        }
1170        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1171    }
1172}
1173
1174fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1175where
1176    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1177{
1178    match expr {
1179        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1180        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1181        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1182            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1183        }
1184        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1185        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1186        PromqlExpr::NumberLiteral(_) => None,
1187        PromqlExpr::StringLiteral(_) => None,
1188        PromqlExpr::Extension(_) => None,
1189        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1190        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1191            let VectorSelector { name, matchers, .. } = vs;
1192
1193            f(name, matchers)
1194        }
1195        PromqlExpr::Call(Call { args, .. }) => args
1196            .args
1197            .iter()
1198            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1199    }
1200}
1201
1202/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
1203fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1204    find_metric_name_and_matchers(expr, |name, matchers| {
1205        // Has name, ignore the matchers
1206        if name.is_some() {
1207            return None;
1208        }
1209
1210        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
1211        Some(matchers.find_matchers(METRIC_NAME))
1212    })
1213    .map(|matchers| {
1214        matchers
1215            .into_iter()
1216            .filter(|m| !matches!(m.op, MatchOp::Equal))
1217            .collect::<Vec<_>>()
1218    })
1219}
1220
1221/// Update the `__name__` matchers in expression into special value
1222/// Returns the updated expression.
1223fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1224    match expr {
1225        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1226            update_metric_name_matcher(expr, metric_name)
1227        }
1228        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1229        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1230            update_metric_name_matcher(lhs, metric_name);
1231            update_metric_name_matcher(rhs, metric_name);
1232        }
1233        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1234        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1235            update_metric_name_matcher(expr, metric_name)
1236        }
1237        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1238            if name.is_some() {
1239                return;
1240            }
1241
1242            for m in &mut matchers.matchers {
1243                if m.name == METRIC_NAME {
1244                    m.op = MatchOp::Equal;
1245                    m.value = metric_name.to_string();
1246                }
1247            }
1248        }
1249        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1250            let VectorSelector { name, matchers, .. } = vs;
1251            if name.is_some() {
1252                return;
1253            }
1254
1255            for m in &mut matchers.matchers {
1256                if m.name == METRIC_NAME {
1257                    m.op = MatchOp::Equal;
1258                    m.value = metric_name.to_string();
1259                }
1260            }
1261        }
1262        PromqlExpr::Call(Call { args, .. }) => {
1263            args.args.iter_mut().for_each(|e| {
1264                update_metric_name_matcher(e, metric_name);
1265            });
1266        }
1267        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1268    }
1269}
1270
1271#[derive(Debug, Default, Serialize, Deserialize)]
1272pub struct LabelValueQuery {
1273    start: Option<String>,
1274    end: Option<String>,
1275    lookback: Option<String>,
1276    #[serde(flatten)]
1277    matches: Matches,
1278    db: Option<String>,
1279    limit: Option<usize>,
1280}
1281
1282#[axum_macros::debug_handler]
1283#[tracing::instrument(
1284    skip_all,
1285    fields(protocol = "prometheus", request_type = "label_values_query")
1286)]
1287pub async fn label_values_query(
1288    State(handler): State<PrometheusHandlerRef>,
1289    Path(label_name): Path<String>,
1290    Extension(mut query_ctx): Extension<QueryContext>,
1291    Query(params): Query<LabelValueQuery>,
1292) -> PrometheusJsonResponse {
1293    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1294    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1295    let query_ctx = Arc::new(query_ctx);
1296
1297    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1298        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1299        .start_timer();
1300
1301    if label_name == METRIC_NAME_LABEL {
1302        let catalog_manager = handler.catalog_manager();
1303
1304        let mut table_names = try_call_return_response!(
1305            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1306        );
1307
1308        truncate_results(&mut table_names, params.limit);
1309        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1310    } else if label_name == FIELD_NAME_LABEL {
1311        let field_columns = handle_schema_err!(
1312            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1313        )
1314        .unwrap_or_default();
1315        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1316        field_columns.sort_unstable();
1317        truncate_results(&mut field_columns, params.limit);
1318        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1319    } else if is_database_selection_label(&label_name) {
1320        let catalog_manager = handler.catalog_manager();
1321
1322        let mut schema_names = try_call_return_response!(
1323            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1324        );
1325        truncate_results(&mut schema_names, params.limit);
1326        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1327    }
1328
1329    let queries = params.matches.0;
1330    if queries.is_empty() {
1331        return PrometheusJsonResponse::error(
1332            StatusCode::InvalidArguments,
1333            "match[] parameter is required",
1334        );
1335    }
1336
1337    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1338    let end = params.end.unwrap_or_else(current_time_rfc3339);
1339    let mut label_values = HashSet::new();
1340
1341    let start = try_call_return_response!(
1342        QueryLanguageParser::parse_promql_timestamp(&start)
1343            .context(ParseTimestampSnafu { timestamp: &start })
1344    );
1345    let end = try_call_return_response!(
1346        QueryLanguageParser::parse_promql_timestamp(&end)
1347            .context(ParseTimestampSnafu { timestamp: &end })
1348    );
1349
1350    for query in queries {
1351        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1352        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1353            return PrometheusJsonResponse::error(
1354                StatusCode::InvalidArguments,
1355                "expected vector selector",
1356            );
1357        };
1358        let Some(name) = take_metric_name(&mut vector_selector) else {
1359            return PrometheusJsonResponse::error(
1360                StatusCode::InvalidArguments,
1361                "expected metric name",
1362            );
1363        };
1364        let VectorSelector { matchers, .. } = vector_selector;
1365        // Only use and filter matchers.
1366        let matchers = matchers.matchers;
1367        let result = handler
1368            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1369            .await;
1370        if let Some(result) = handle_schema_err!(result) {
1371            label_values.extend(result.into_iter());
1372        }
1373    }
1374
1375    let mut label_values: Vec<_> = label_values.into_iter().collect();
1376    label_values.sort_unstable();
1377    truncate_results(&mut label_values, params.limit);
1378
1379    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1380}
1381
1382fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1383    if let Some(limit) = limit
1384        && limit > 0
1385        && label_values.len() >= limit
1386    {
1387        label_values.truncate(limit);
1388    }
1389}
1390
1391/// Take metric name from the [VectorSelector].
1392/// It takes the name in the selector or removes the name matcher.
1393fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1394    if let Some(name) = selector.name.take() {
1395        return Some(name);
1396    }
1397
1398    let (pos, matcher) = selector
1399        .matchers
1400        .matchers
1401        .iter()
1402        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1403    let name = matcher.value.clone();
1404    // We need to remove the name matcher to avoid using it as a filter in query.
1405    selector.matchers.matchers.remove(pos);
1406
1407    Some(name)
1408}
1409
1410async fn retrieve_table_names(
1411    query_ctx: &QueryContext,
1412    catalog_manager: CatalogManagerRef,
1413    matches: Vec<String>,
1414) -> Result<Vec<String>> {
1415    let catalog = query_ctx.current_catalog();
1416    let schema = query_ctx.current_schema();
1417
1418    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1419    let mut table_names = Vec::new();
1420
1421    // we only provide very limited support for matcher against __name__
1422    let name_matcher = matches
1423        .first()
1424        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1425        .and_then(|expr| {
1426            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1427                let matchers = vector_selector.matchers.matchers;
1428                for matcher in matchers {
1429                    if matcher.name == METRIC_NAME_LABEL {
1430                        return Some(matcher);
1431                    }
1432                }
1433
1434                None
1435            } else {
1436                None
1437            }
1438        });
1439
1440    while let Some(table) = tables_stream.next().await {
1441        let table = table?;
1442        if !table
1443            .table_info()
1444            .meta
1445            .options
1446            .extra_options
1447            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1448        {
1449            // skip non-prometheus (non-metricengine) tables for __name__ query
1450            continue;
1451        }
1452
1453        let table_name = &table.table_info().name;
1454
1455        if let Some(matcher) = &name_matcher {
1456            match &matcher.op {
1457                MatchOp::Equal => {
1458                    if table_name == &matcher.value {
1459                        table_names.push(table_name.clone());
1460                    }
1461                }
1462                MatchOp::Re(reg) => {
1463                    if reg.is_match(table_name) {
1464                        table_names.push(table_name.clone());
1465                    }
1466                }
1467                _ => {
1468                    // != and !~ are not supported:
1469                    // vector must contains at least one non-empty matcher
1470                    table_names.push(table_name.clone());
1471                }
1472            }
1473        } else {
1474            table_names.push(table_name.clone());
1475        }
1476    }
1477
1478    table_names.sort_unstable();
1479    Ok(table_names)
1480}
1481
1482async fn retrieve_field_names(
1483    query_ctx: &QueryContext,
1484    manager: CatalogManagerRef,
1485    matches: Vec<String>,
1486) -> Result<HashSet<String>> {
1487    let mut field_columns = HashSet::new();
1488    let catalog = query_ctx.current_catalog();
1489    let schema = query_ctx.current_schema();
1490
1491    if matches.is_empty() {
1492        // query all tables if no matcher is provided
1493        while let Some(table) = manager
1494            .tables(catalog, &schema, Some(query_ctx))
1495            .next()
1496            .await
1497        {
1498            let table = table?;
1499            for column in table.field_columns() {
1500                field_columns.insert(column.name);
1501            }
1502        }
1503        return Ok(field_columns);
1504    }
1505
1506    for table_name in matches {
1507        let table = manager
1508            .table(catalog, &schema, &table_name, Some(query_ctx))
1509            .await?
1510            .with_context(|| TableNotFoundSnafu {
1511                catalog: catalog.to_string(),
1512                schema: schema.clone(),
1513                table: table_name.clone(),
1514            })?;
1515
1516        for column in table.field_columns() {
1517            field_columns.insert(column.name);
1518        }
1519    }
1520    Ok(field_columns)
1521}
1522
1523async fn retrieve_schema_names(
1524    query_ctx: &QueryContext,
1525    catalog_manager: CatalogManagerRef,
1526    matches: Vec<String>,
1527) -> Result<Vec<String>> {
1528    let mut schemas = Vec::new();
1529    let catalog = query_ctx.current_catalog();
1530
1531    let candidate_schemas = catalog_manager
1532        .schema_names(catalog, Some(query_ctx))
1533        .await?;
1534
1535    for schema in candidate_schemas {
1536        let mut found = true;
1537        for match_item in &matches {
1538            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1539                let exists = catalog_manager
1540                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1541                    .await?;
1542                if !exists {
1543                    found = false;
1544                    break;
1545                }
1546            }
1547        }
1548
1549        if found {
1550            schemas.push(schema);
1551        }
1552    }
1553
1554    schemas.sort_unstable();
1555
1556    Ok(schemas)
1557}
1558
1559/// Try to parse and extract the name of referenced metric from the promql query.
1560///
1561/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1562/// Multiple references to the same metric are allowed.
1563fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1564    let promql_expr = promql_parser::parser::parse(query).ok()?;
1565    promql_expr_to_metric_name(&promql_expr)
1566}
1567
1568#[derive(Debug, Default, Serialize, Deserialize)]
1569pub struct SeriesQuery {
1570    start: Option<String>,
1571    end: Option<String>,
1572    lookback: Option<String>,
1573    #[serde(flatten)]
1574    matches: Matches,
1575    db: Option<String>,
1576}
1577
1578#[axum_macros::debug_handler]
1579#[tracing::instrument(
1580    skip_all,
1581    fields(protocol = "prometheus", request_type = "series_query")
1582)]
1583pub async fn series_query(
1584    State(handler): State<PrometheusHandlerRef>,
1585    Query(params): Query<SeriesQuery>,
1586    Extension(mut query_ctx): Extension<QueryContext>,
1587    Form(form_params): Form<SeriesQuery>,
1588) -> PrometheusJsonResponse {
1589    let mut queries: Vec<String> = params.matches.0;
1590    if queries.is_empty() {
1591        queries = form_params.matches.0;
1592    }
1593    if queries.is_empty() {
1594        return PrometheusJsonResponse::error(
1595            StatusCode::Unsupported,
1596            "match[] parameter is required",
1597        );
1598    }
1599    let start = params
1600        .start
1601        .or(form_params.start)
1602        .unwrap_or_else(yesterday_rfc3339);
1603    let end = params
1604        .end
1605        .or(form_params.end)
1606        .unwrap_or_else(current_time_rfc3339);
1607    let lookback = params
1608        .lookback
1609        .or(form_params.lookback)
1610        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1611
1612    // update catalog and schema in query context if necessary
1613    if let Some(db) = &params.db {
1614        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1615        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1616    }
1617    let query_ctx = Arc::new(query_ctx);
1618
1619    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1620        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1621        .start_timer();
1622
1623    let mut series = Vec::new();
1624    let mut merge_map = HashMap::new();
1625    for query in queries {
1626        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1627        let prom_query = PromQuery {
1628            query,
1629            start: start.clone(),
1630            end: end.clone(),
1631            // TODO: find a better value for step
1632            step: DEFAULT_LOOKBACK_STRING.to_string(),
1633            lookback: lookback.clone(),
1634            alias: None,
1635        };
1636        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1637
1638        handle_schema_err!(
1639            retrieve_series_from_query_result(
1640                result,
1641                &mut series,
1642                &query_ctx,
1643                &table_name,
1644                &handler.catalog_manager(),
1645                &mut merge_map,
1646            )
1647            .await
1648        );
1649    }
1650    let merge_map = merge_map
1651        .into_iter()
1652        .map(|(k, v)| (k, Value::from(v)))
1653        .collect();
1654    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1655    resp.resp_metrics = merge_map;
1656    resp
1657}
1658
1659#[derive(Debug, Default, Serialize, Deserialize)]
1660pub struct ParseQuery {
1661    query: Option<String>,
1662    db: Option<String>,
1663}
1664
1665#[axum_macros::debug_handler]
1666#[tracing::instrument(
1667    skip_all,
1668    fields(protocol = "prometheus", request_type = "parse_query")
1669)]
1670pub async fn parse_query(
1671    State(_handler): State<PrometheusHandlerRef>,
1672    Query(params): Query<ParseQuery>,
1673    Extension(_query_ctx): Extension<QueryContext>,
1674    Form(form_params): Form<ParseQuery>,
1675) -> PrometheusJsonResponse {
1676    if let Some(query) = params.query.or(form_params.query) {
1677        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1678        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1679    } else {
1680        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1681    }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686    use promql_parser::parser::value::ValueType;
1687
1688    use super::*;
1689
1690    struct TestCase {
1691        name: &'static str,
1692        promql: &'static str,
1693        expected_metric: Option<&'static str>,
1694        expected_type: ValueType,
1695        should_error: bool,
1696    }
1697
1698    #[test]
1699    fn test_retrieve_metric_name_and_result_type() {
1700        let test_cases = &[
1701            // Single metric cases
1702            TestCase {
1703                name: "simple metric",
1704                promql: "cpu_usage",
1705                expected_metric: Some("cpu_usage"),
1706                expected_type: ValueType::Vector,
1707                should_error: false,
1708            },
1709            TestCase {
1710                name: "metric with selector",
1711                promql: r#"cpu_usage{instance="localhost"}"#,
1712                expected_metric: Some("cpu_usage"),
1713                expected_type: ValueType::Vector,
1714                should_error: false,
1715            },
1716            TestCase {
1717                name: "metric with range selector",
1718                promql: "cpu_usage[5m]",
1719                expected_metric: Some("cpu_usage"),
1720                expected_type: ValueType::Matrix,
1721                should_error: false,
1722            },
1723            TestCase {
1724                name: "metric with __name__ matcher",
1725                promql: r#"{__name__="cpu_usage"}"#,
1726                expected_metric: Some("cpu_usage"),
1727                expected_type: ValueType::Vector,
1728                should_error: false,
1729            },
1730            TestCase {
1731                name: "metric with unary operator",
1732                promql: "-cpu_usage",
1733                expected_metric: None,
1734                expected_type: ValueType::Vector,
1735                should_error: false,
1736            },
1737            // Aggregation and function cases
1738            TestCase {
1739                name: "metric with aggregation",
1740                promql: "sum(cpu_usage)",
1741                expected_metric: Some("cpu_usage"),
1742                expected_type: ValueType::Vector,
1743                should_error: false,
1744            },
1745            TestCase {
1746                name: "complex aggregation",
1747                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1748                expected_metric: None,
1749                expected_type: ValueType::Vector,
1750                should_error: false,
1751            },
1752            TestCase {
1753                name: "complex aggregation",
1754                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1755                expected_metric: Some("cpu_usage"),
1756                expected_type: ValueType::Vector,
1757                should_error: false,
1758            },
1759            TestCase {
1760                name: "complex aggregation",
1761                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1762                expected_metric: Some("cpu_usage"),
1763                expected_type: ValueType::Vector,
1764                should_error: false,
1765            },
1766            // Same metric binary operations
1767            TestCase {
1768                name: "same metric addition",
1769                promql: "cpu_usage + cpu_usage",
1770                expected_metric: None,
1771                expected_type: ValueType::Vector,
1772                should_error: false,
1773            },
1774            TestCase {
1775                name: "metric with scalar addition",
1776                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1777                expected_metric: None,
1778                expected_type: ValueType::Vector,
1779                should_error: false,
1780            },
1781            // Multiple metrics cases
1782            TestCase {
1783                name: "different metrics addition",
1784                promql: "cpu_usage + memory_usage",
1785                expected_metric: None,
1786                expected_type: ValueType::Vector,
1787                should_error: false,
1788            },
1789            TestCase {
1790                name: "different metrics subtraction",
1791                promql: "network_in - network_out",
1792                expected_metric: None,
1793                expected_type: ValueType::Vector,
1794                should_error: false,
1795            },
1796            // Unless operator cases
1797            TestCase {
1798                name: "unless with different metrics",
1799                promql: "cpu_usage unless memory_usage",
1800                expected_metric: Some("cpu_usage"),
1801                expected_type: ValueType::Vector,
1802                should_error: false,
1803            },
1804            TestCase {
1805                name: "unless with same metric",
1806                promql: "cpu_usage unless cpu_usage",
1807                expected_metric: Some("cpu_usage"),
1808                expected_type: ValueType::Vector,
1809                should_error: false,
1810            },
1811            // Subquery cases
1812            TestCase {
1813                name: "basic subquery",
1814                promql: "cpu_usage[5m:1m]",
1815                expected_metric: Some("cpu_usage"),
1816                expected_type: ValueType::Matrix,
1817                should_error: false,
1818            },
1819            TestCase {
1820                name: "subquery with multiple metrics",
1821                promql: "(cpu_usage + memory_usage)[5m:1m]",
1822                expected_metric: None,
1823                expected_type: ValueType::Matrix,
1824                should_error: false,
1825            },
1826            // Literal values
1827            TestCase {
1828                name: "scalar value",
1829                promql: "42",
1830                expected_metric: None,
1831                expected_type: ValueType::Scalar,
1832                should_error: false,
1833            },
1834            TestCase {
1835                name: "string literal",
1836                promql: r#""hello world""#,
1837                expected_metric: None,
1838                expected_type: ValueType::String,
1839                should_error: false,
1840            },
1841            // Error cases
1842            TestCase {
1843                name: "invalid syntax",
1844                promql: "cpu_usage{invalid=",
1845                expected_metric: None,
1846                expected_type: ValueType::Vector,
1847                should_error: true,
1848            },
1849            TestCase {
1850                name: "empty query",
1851                promql: "",
1852                expected_metric: None,
1853                expected_type: ValueType::Vector,
1854                should_error: true,
1855            },
1856            TestCase {
1857                name: "malformed brackets",
1858                promql: "cpu_usage[5m",
1859                expected_metric: None,
1860                expected_type: ValueType::Vector,
1861                should_error: true,
1862            },
1863        ];
1864
1865        for test_case in test_cases {
1866            let result = retrieve_metric_name_and_result_type(test_case.promql);
1867
1868            if test_case.should_error {
1869                assert!(
1870                    result.is_err(),
1871                    "Test '{}' should have failed but succeeded with: {:?}",
1872                    test_case.name,
1873                    result
1874                );
1875            } else {
1876                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1877                    panic!(
1878                        "Test '{}' should have succeeded but failed with error: {}",
1879                        test_case.name, e
1880                    )
1881                });
1882
1883                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1884                assert_eq!(
1885                    metric_name, expected_metric_name,
1886                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1887                    test_case.name, expected_metric_name, metric_name
1888                );
1889
1890                assert_eq!(
1891                    value_type, test_case.expected_type,
1892                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1893                    test_case.name, test_case.expected_type, value_type
1894                );
1895            }
1896        }
1897    }
1898}