servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16
17use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::{
24    Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
25    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
26    UInt8Type, UInt16Type, UInt32Type, UInt64Type,
27};
28use arrow_schema::{DataType, IntervalUnit};
29use axum::extract::{Path, Query, State};
30use axum::{Extension, Form};
31use catalog::CatalogManagerRef;
32use common_catalog::parse_catalog_and_schema_from_db_string;
33use common_decimal::Decimal128;
34use common_error::ext::ErrorExt;
35use common_error::status_code::StatusCode;
36use common_query::{Output, OutputData};
37use common_recordbatch::{RecordBatch, RecordBatches};
38use common_telemetry::{debug, tracing};
39use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
40use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
41use common_version::OwnedBuildInfo;
42use datafusion_common::ScalarValue;
43use datatypes::prelude::ConcreteDataType;
44use datatypes::schema::{ColumnSchema, SchemaRef};
45use datatypes::types::jsonb_to_string;
46use futures::StreamExt;
47use futures::future::join_all;
48use itertools::Itertools;
49use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
50use promql_parser::parser::token::{self};
51use promql_parser::parser::value::ValueType;
52use promql_parser::parser::{
53    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
54    SubqueryExpr, UnaryExpr, VectorSelector,
55};
56use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
57use serde::de::{self, MapAccess, Visitor};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use session::context::{QueryContext, QueryContextRef};
61use snafu::{Location, OptionExt, ResultExt};
62use store_api::metric_engine_consts::{
63    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
64};
65
66pub use super::result::prometheus_resp::PrometheusJsonResponse;
67use crate::error::{
68    CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
69    InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
70    UnexpectedResultSnafu,
71};
72use crate::http::header::collect_plan_metrics;
73use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
74use crate::prometheus_handler::PrometheusHandlerRef;
75
76/// For [ValueType::Vector] result type
77#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
78pub struct PromSeriesVector {
79    pub metric: BTreeMap<String, String>,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub value: Option<(f64, String)>,
82}
83
84/// For [ValueType::Matrix] result type
85#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
86pub struct PromSeriesMatrix {
87    pub metric: BTreeMap<String, String>,
88    pub values: Vec<(f64, String)>,
89}
90
91/// Variants corresponding to [ValueType]
92#[derive(Debug, Serialize, Deserialize, PartialEq)]
93#[serde(untagged)]
94pub enum PromQueryResult {
95    Matrix(Vec<PromSeriesMatrix>),
96    Vector(Vec<PromSeriesVector>),
97    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
98    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
99}
100
101impl Default for PromQueryResult {
102    fn default() -> Self {
103        PromQueryResult::Matrix(Default::default())
104    }
105}
106
107#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
108pub struct PromData {
109    #[serde(rename = "resultType")]
110    pub result_type: String,
111    pub result: PromQueryResult,
112}
113
114/// A "holder" for the reference([Arc]) to a column name,
115/// to help avoiding cloning [String]s when used as a [HashMap] key.
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
117pub struct Column(Arc<String>);
118
119impl From<&str> for Column {
120    fn from(s: &str) -> Self {
121        Self(Arc::new(s.to_string()))
122    }
123}
124
125#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
126#[serde(untagged)]
127pub enum PrometheusResponse {
128    PromData(PromData),
129    Labels(Vec<String>),
130    Series(Vec<HashMap<Column, String>>),
131    LabelValues(Vec<String>),
132    FormatQuery(String),
133    BuildInfo(OwnedBuildInfo),
134    #[serde(skip_deserializing)]
135    ParseResult(promql_parser::parser::Expr),
136    #[default]
137    None,
138}
139
140impl PrometheusResponse {
141    /// Append the other [`PrometheusResponse]`.
142    /// # NOTE
143    ///   Only append matrix and vector results, otherwise just ignore the other response.
144    fn append(&mut self, other: PrometheusResponse) {
145        match (self, other) {
146            (
147                PrometheusResponse::PromData(PromData {
148                    result: PromQueryResult::Matrix(lhs),
149                    ..
150                }),
151                PrometheusResponse::PromData(PromData {
152                    result: PromQueryResult::Matrix(rhs),
153                    ..
154                }),
155            ) => {
156                lhs.extend(rhs);
157            }
158
159            (
160                PrometheusResponse::PromData(PromData {
161                    result: PromQueryResult::Vector(lhs),
162                    ..
163                }),
164                PrometheusResponse::PromData(PromData {
165                    result: PromQueryResult::Vector(rhs),
166                    ..
167                }),
168            ) => {
169                lhs.extend(rhs);
170            }
171            _ => {
172                // TODO(dennis): process other cases?
173            }
174        }
175    }
176
177    pub fn is_none(&self) -> bool {
178        matches!(self, PrometheusResponse::None)
179    }
180}
181
182#[derive(Debug, Default, Serialize, Deserialize)]
183pub struct FormatQuery {
184    query: Option<String>,
185}
186
187#[axum_macros::debug_handler]
188#[tracing::instrument(
189    skip_all,
190    fields(protocol = "prometheus", request_type = "format_query")
191)]
192pub async fn format_query(
193    State(_handler): State<PrometheusHandlerRef>,
194    Query(params): Query<InstantQuery>,
195    Extension(_query_ctx): Extension<QueryContext>,
196    Form(form_params): Form<InstantQuery>,
197) -> PrometheusJsonResponse {
198    let query = params.query.or(form_params.query).unwrap_or_default();
199    match promql_parser::parser::parse(&query) {
200        Ok(expr) => {
201            let pretty = expr.prettify();
202            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
203        }
204        Err(reason) => {
205            let err = InvalidQuerySnafu { reason }.build();
206            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
207        }
208    }
209}
210
211#[derive(Debug, Default, Serialize, Deserialize)]
212pub struct BuildInfoQuery {}
213
214#[axum_macros::debug_handler]
215#[tracing::instrument(
216    skip_all,
217    fields(protocol = "prometheus", request_type = "build_info_query")
218)]
219pub async fn build_info_query() -> PrometheusJsonResponse {
220    let build_info = common_version::build_info().clone();
221    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
222}
223
224#[derive(Debug, Default, Serialize, Deserialize)]
225pub struct InstantQuery {
226    query: Option<String>,
227    lookback: Option<String>,
228    time: Option<String>,
229    timeout: Option<String>,
230    db: Option<String>,
231}
232
233/// Helper macro which try to evaluate the expression and return its results.
234/// If the evaluation fails, return a `PrometheusJsonResponse` early.
235macro_rules! try_call_return_response {
236    ($handle: expr) => {
237        match $handle {
238            Ok(res) => res,
239            Err(err) => {
240                let msg = err.to_string();
241                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
242            }
243        }
244    };
245}
246
247#[axum_macros::debug_handler]
248#[tracing::instrument(
249    skip_all,
250    fields(protocol = "prometheus", request_type = "instant_query")
251)]
252pub async fn instant_query(
253    State(handler): State<PrometheusHandlerRef>,
254    Query(params): Query<InstantQuery>,
255    Extension(mut query_ctx): Extension<QueryContext>,
256    Form(form_params): Form<InstantQuery>,
257) -> PrometheusJsonResponse {
258    // Extract time from query string, or use current server time if not specified.
259    let time = params
260        .time
261        .or(form_params.time)
262        .unwrap_or_else(current_time_rfc3339);
263    let prom_query = PromQuery {
264        query: params.query.or(form_params.query).unwrap_or_default(),
265        start: time.clone(),
266        end: time,
267        step: "1s".to_string(),
268        lookback: params
269            .lookback
270            .or(form_params.lookback)
271            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
272        alias: None,
273    };
274
275    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
276
277    // update catalog and schema in query context if necessary
278    if let Some(db) = &params.db {
279        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
280        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
281    }
282    let query_ctx = Arc::new(query_ctx);
283
284    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
285        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
286        .start_timer();
287
288    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
289        && !name_matchers.is_empty()
290    {
291        debug!("Find metric name matchers: {:?}", name_matchers);
292
293        let metric_names =
294            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
295
296        debug!("Find metric names: {:?}", metric_names);
297
298        if metric_names.is_empty() {
299            let result_type = promql_expr.value_type();
300
301            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
302                result_type: result_type.to_string(),
303                ..Default::default()
304            }));
305        }
306
307        let responses = join_all(metric_names.into_iter().map(|metric| {
308            let mut prom_query = prom_query.clone();
309            let mut promql_expr = promql_expr.clone();
310            let query_ctx = query_ctx.clone();
311            let handler = handler.clone();
312
313            async move {
314                update_metric_name_matcher(&mut promql_expr, &metric);
315                let new_query = promql_expr.to_string();
316                debug!(
317                    "Updated promql, before: {}, after: {}",
318                    &prom_query.query, new_query
319                );
320                prom_query.query = new_query;
321
322                do_instant_query(&handler, &prom_query, query_ctx).await
323            }
324        }))
325        .await;
326
327        responses
328            .into_iter()
329            .reduce(|mut acc, resp| {
330                acc.data.append(resp.data);
331                acc
332            })
333            .unwrap()
334    } else {
335        do_instant_query(&handler, &prom_query, query_ctx).await
336    }
337}
338
339/// Executes a single instant query and returns response
340async fn do_instant_query(
341    handler: &PrometheusHandlerRef,
342    prom_query: &PromQuery,
343    query_ctx: QueryContextRef,
344) -> PrometheusJsonResponse {
345    let result = handler.do_query(prom_query, query_ctx).await;
346    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
347        Ok((metric_name, result_type)) => (metric_name, result_type),
348        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
349    };
350    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
351}
352
353#[derive(Debug, Default, Serialize, Deserialize)]
354pub struct RangeQuery {
355    query: Option<String>,
356    start: Option<String>,
357    end: Option<String>,
358    step: Option<String>,
359    lookback: Option<String>,
360    timeout: Option<String>,
361    db: Option<String>,
362}
363
364#[axum_macros::debug_handler]
365#[tracing::instrument(
366    skip_all,
367    fields(protocol = "prometheus", request_type = "range_query")
368)]
369pub async fn range_query(
370    State(handler): State<PrometheusHandlerRef>,
371    Query(params): Query<RangeQuery>,
372    Extension(mut query_ctx): Extension<QueryContext>,
373    Form(form_params): Form<RangeQuery>,
374) -> PrometheusJsonResponse {
375    let prom_query = PromQuery {
376        query: params.query.or(form_params.query).unwrap_or_default(),
377        start: params.start.or(form_params.start).unwrap_or_default(),
378        end: params.end.or(form_params.end).unwrap_or_default(),
379        step: params.step.or(form_params.step).unwrap_or_default(),
380        lookback: params
381            .lookback
382            .or(form_params.lookback)
383            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
384        alias: None,
385    };
386
387    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
388
389    // update catalog and schema in query context if necessary
390    if let Some(db) = &params.db {
391        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
392        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
393    }
394    let query_ctx = Arc::new(query_ctx);
395    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
396        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
397        .start_timer();
398
399    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
400        && !name_matchers.is_empty()
401    {
402        debug!("Find metric name matchers: {:?}", name_matchers);
403
404        let metric_names =
405            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
406
407        debug!("Find metric names: {:?}", metric_names);
408
409        if metric_names.is_empty() {
410            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
411                result_type: ValueType::Matrix.to_string(),
412                ..Default::default()
413            }));
414        }
415
416        let responses = join_all(metric_names.into_iter().map(|metric| {
417            let mut prom_query = prom_query.clone();
418            let mut promql_expr = promql_expr.clone();
419            let query_ctx = query_ctx.clone();
420            let handler = handler.clone();
421
422            async move {
423                update_metric_name_matcher(&mut promql_expr, &metric);
424                let new_query = promql_expr.to_string();
425                debug!(
426                    "Updated promql, before: {}, after: {}",
427                    &prom_query.query, new_query
428                );
429                prom_query.query = new_query;
430
431                do_range_query(&handler, &prom_query, query_ctx).await
432            }
433        }))
434        .await;
435
436        // Safety: at least one responses, checked above
437        responses
438            .into_iter()
439            .reduce(|mut acc, resp| {
440                acc.data.append(resp.data);
441                acc
442            })
443            .unwrap()
444    } else {
445        do_range_query(&handler, &prom_query, query_ctx).await
446    }
447}
448
449/// Executes a single range query and returns response
450async fn do_range_query(
451    handler: &PrometheusHandlerRef,
452    prom_query: &PromQuery,
453    query_ctx: QueryContextRef,
454) -> PrometheusJsonResponse {
455    let result = handler.do_query(prom_query, query_ctx).await;
456    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
457        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
458        Ok((metric_name, _)) => metric_name,
459    };
460    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
461}
462
463#[derive(Debug, Default, Serialize)]
464struct Matches(Vec<String>);
465
466#[derive(Debug, Default, Serialize, Deserialize)]
467pub struct LabelsQuery {
468    start: Option<String>,
469    end: Option<String>,
470    lookback: Option<String>,
471    #[serde(flatten)]
472    matches: Matches,
473    db: Option<String>,
474}
475
476// Custom Deserialize method to support parsing repeated match[]
477impl<'de> Deserialize<'de> for Matches {
478    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
479    where
480        D: de::Deserializer<'de>,
481    {
482        struct MatchesVisitor;
483
484        impl<'d> Visitor<'d> for MatchesVisitor {
485            type Value = Vec<String>;
486
487            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
488                formatter.write_str("a string")
489            }
490
491            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
492            where
493                M: MapAccess<'d>,
494            {
495                let mut matches = Vec::new();
496                while let Some((key, value)) = access.next_entry::<String, String>()? {
497                    if key == "match[]" {
498                        matches.push(value);
499                    }
500                }
501                Ok(matches)
502            }
503        }
504        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
505    }
506}
507
508/// Handles schema errors, transforming a Result into an Option.
509/// - If the input is `Ok(v)`, returns `Some(v)`
510/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
511///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
512/// - If the input is `Err(err)` with any other error code, directly returns a
513///   `PrometheusJsonResponse::error`.
514macro_rules! handle_schema_err {
515    ($result:expr) => {
516        match $result {
517            Ok(v) => Some(v),
518            Err(err) => {
519                if err.status_code() == StatusCode::TableNotFound
520                    || err.status_code() == StatusCode::TableColumnNotFound
521                {
522                    // Prometheus won't report error if querying nonexist label and metric
523                    None
524                } else {
525                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
526                }
527            }
528        }
529    };
530}
531
532#[axum_macros::debug_handler]
533#[tracing::instrument(
534    skip_all,
535    fields(protocol = "prometheus", request_type = "labels_query")
536)]
537pub async fn labels_query(
538    State(handler): State<PrometheusHandlerRef>,
539    Query(params): Query<LabelsQuery>,
540    Extension(mut query_ctx): Extension<QueryContext>,
541    Form(form_params): Form<LabelsQuery>,
542) -> PrometheusJsonResponse {
543    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
544    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
545    let query_ctx = Arc::new(query_ctx);
546
547    let mut queries = params.matches.0;
548    if queries.is_empty() {
549        queries = form_params.matches.0;
550    }
551
552    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
553        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
554        .start_timer();
555
556    // Fetch all tag columns. It will be used as white-list for tag names.
557    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
558    {
559        Ok(labels) => labels,
560        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
561    };
562    // insert the special metric name label
563    let _ = labels.insert(METRIC_NAME.to_string());
564
565    // Fetch all columns if no query matcher is provided
566    if queries.is_empty() {
567        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
568        labels_vec.sort_unstable();
569        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
570    }
571
572    // Otherwise, run queries and extract column name from result set.
573    let start = params
574        .start
575        .or(form_params.start)
576        .unwrap_or_else(yesterday_rfc3339);
577    let end = params
578        .end
579        .or(form_params.end)
580        .unwrap_or_else(current_time_rfc3339);
581    let lookback = params
582        .lookback
583        .or(form_params.lookback)
584        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
585
586    let mut fetched_labels = HashSet::new();
587    let _ = fetched_labels.insert(METRIC_NAME.to_string());
588
589    let mut merge_map = HashMap::new();
590    for query in queries {
591        let prom_query = PromQuery {
592            query,
593            start: start.clone(),
594            end: end.clone(),
595            step: DEFAULT_LOOKBACK_STRING.to_string(),
596            lookback: lookback.clone(),
597            alias: None,
598        };
599
600        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
601        handle_schema_err!(
602            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
603                .await
604        );
605    }
606
607    // intersect `fetched_labels` with `labels` to filter out non-tag columns
608    fetched_labels.retain(|l| labels.contains(l));
609    let _ = labels.insert(METRIC_NAME.to_string());
610
611    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
612    sorted_labels.sort();
613    let merge_map = merge_map
614        .into_iter()
615        .map(|(k, v)| (k, Value::from(v)))
616        .collect();
617    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
618    resp.resp_metrics = merge_map;
619    resp
620}
621
622/// Get all tag column name of the given schema
623async fn get_all_column_names(
624    catalog: &str,
625    schema: &str,
626    manager: &CatalogManagerRef,
627) -> std::result::Result<HashSet<String>, catalog::error::Error> {
628    let table_names = manager.table_names(catalog, schema, None).await?;
629
630    let mut labels = HashSet::new();
631    for table_name in table_names {
632        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
633            continue;
634        };
635        for column in table.primary_key_columns() {
636            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
637                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
638            {
639                labels.insert(column.name);
640            }
641        }
642    }
643
644    Ok(labels)
645}
646
647async fn retrieve_series_from_query_result(
648    result: Result<Output>,
649    series: &mut Vec<HashMap<Column, String>>,
650    query_ctx: &QueryContext,
651    table_name: &str,
652    manager: &CatalogManagerRef,
653    metrics: &mut HashMap<String, u64>,
654) -> Result<()> {
655    let result = result?;
656
657    // fetch tag list
658    let table = manager
659        .table(
660            query_ctx.current_catalog(),
661            &query_ctx.current_schema(),
662            table_name,
663            Some(query_ctx),
664        )
665        .await
666        .context(CatalogSnafu)?
667        .with_context(|| TableNotFoundSnafu {
668            catalog: query_ctx.current_catalog(),
669            schema: query_ctx.current_schema(),
670            table: table_name,
671        })?;
672    let tag_columns = table
673        .primary_key_columns()
674        .map(|c| c.name)
675        .collect::<HashSet<_>>();
676
677    match result.data {
678        OutputData::RecordBatches(batches) => {
679            record_batches_to_series(batches, series, table_name, &tag_columns)
680        }
681        OutputData::Stream(stream) => {
682            let batches = RecordBatches::try_collect(stream)
683                .await
684                .context(CollectRecordbatchSnafu)?;
685            record_batches_to_series(batches, series, table_name, &tag_columns)
686        }
687        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
688            reason: "expected data result, but got affected rows".to_string(),
689            location: Location::default(),
690        }),
691    }?;
692
693    if let Some(ref plan) = result.meta.plan {
694        collect_plan_metrics(plan, &mut [metrics]);
695    }
696    Ok(())
697}
698
699/// Retrieve labels name from query result
700async fn retrieve_labels_name_from_query_result(
701    result: Result<Output>,
702    labels: &mut HashSet<String>,
703    metrics: &mut HashMap<String, u64>,
704) -> Result<()> {
705    let result = result?;
706    match result.data {
707        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
708        OutputData::Stream(stream) => {
709            let batches = RecordBatches::try_collect(stream)
710                .await
711                .context(CollectRecordbatchSnafu)?;
712            record_batches_to_labels_name(batches, labels)
713        }
714        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
715            reason: "expected data result, but got affected rows".to_string(),
716        }
717        .fail(),
718    }?;
719    if let Some(ref plan) = result.meta.plan {
720        collect_plan_metrics(plan, &mut [metrics]);
721    }
722    Ok(())
723}
724
725fn record_batches_to_series(
726    batches: RecordBatches,
727    series: &mut Vec<HashMap<Column, String>>,
728    table_name: &str,
729    tag_columns: &HashSet<String>,
730) -> Result<()> {
731    for batch in batches.iter() {
732        // project record batch to only contains tag columns
733        let projection = batch
734            .schema
735            .column_schemas()
736            .iter()
737            .enumerate()
738            .filter_map(|(idx, col)| {
739                if tag_columns.contains(&col.name) {
740                    Some(idx)
741                } else {
742                    None
743                }
744            })
745            .collect::<Vec<_>>();
746        let batch = batch
747            .try_project(&projection)
748            .context(CollectRecordbatchSnafu)?;
749
750        let mut writer = RowWriter::new(&batch.schema, table_name);
751        writer.write(batch, series)?;
752    }
753    Ok(())
754}
755
756/// Writer from a row in the record batch to a Prometheus time series:
757///
758/// `{__name__="<metric name>", <label name>="<label value>", ...}`
759///
760/// The metrics name is the table name; label names are the column names and
761/// the label values are the corresponding row values (all are converted to strings).
762struct RowWriter {
763    /// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
764    /// and label names, waiting to be filled by row values afterward.
765    template: HashMap<Column, Option<String>>,
766    /// The current filling row.
767    current: Option<HashMap<Column, Option<String>>>,
768}
769
770impl RowWriter {
771    fn new(schema: &SchemaRef, table: &str) -> Self {
772        let mut template = schema
773            .column_schemas()
774            .iter()
775            .map(|x| (x.name.as_str().into(), None))
776            .collect::<HashMap<Column, Option<String>>>();
777        template.insert("__name__".into(), Some(table.to_string()));
778        Self {
779            template,
780            current: None,
781        }
782    }
783
784    fn insert(&mut self, column: ColumnRef, value: impl ToString) {
785        let current = self.current.get_or_insert_with(|| self.template.clone());
786        match current.get_mut(&column as &dyn AsColumnRef) {
787            Some(x) => {
788                let _ = x.insert(value.to_string());
789            }
790            None => {
791                let _ = current.insert(column.0.into(), Some(value.to_string()));
792            }
793        }
794    }
795
796    fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
797        let column_name = column_schema.name.as_str().into();
798
799        if column_schema.data_type.is_json() {
800            let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
801            self.insert(column_name, s);
802        } else {
803            let hex = bytes
804                .iter()
805                .map(|b| format!("{b:02x}"))
806                .collect::<Vec<String>>()
807                .join("");
808            self.insert(column_name, hex);
809        }
810        Ok(())
811    }
812
813    fn finish(&mut self) -> HashMap<Column, String> {
814        let Some(current) = self.current.take() else {
815            return HashMap::new();
816        };
817        current
818            .into_iter()
819            .filter_map(|(k, v)| v.map(|v| (k, v)))
820            .collect()
821    }
822
823    fn write(
824        &mut self,
825        record_batch: RecordBatch,
826        series: &mut Vec<HashMap<Column, String>>,
827    ) -> Result<()> {
828        let schema = record_batch.schema.clone();
829        let record_batch = record_batch.into_df_record_batch();
830        for i in 0..record_batch.num_rows() {
831            for (j, array) in record_batch.columns().iter().enumerate() {
832                let column = schema.column_name_by_index(j).into();
833
834                if array.is_null(i) {
835                    self.insert(column, "Null");
836                    continue;
837                }
838
839                match array.data_type() {
840                    DataType::Null => {
841                        self.insert(column, "Null");
842                    }
843                    DataType::Boolean => {
844                        let array = array.as_boolean();
845                        let v = array.value(i);
846                        self.insert(column, v);
847                    }
848                    DataType::UInt8 => {
849                        let array = array.as_primitive::<UInt8Type>();
850                        let v = array.value(i);
851                        self.insert(column, v);
852                    }
853                    DataType::UInt16 => {
854                        let array = array.as_primitive::<UInt16Type>();
855                        let v = array.value(i);
856                        self.insert(column, v);
857                    }
858                    DataType::UInt32 => {
859                        let array = array.as_primitive::<UInt32Type>();
860                        let v = array.value(i);
861                        self.insert(column, v);
862                    }
863                    DataType::UInt64 => {
864                        let array = array.as_primitive::<UInt64Type>();
865                        let v = array.value(i);
866                        self.insert(column, v);
867                    }
868                    DataType::Int8 => {
869                        let array = array.as_primitive::<Int8Type>();
870                        let v = array.value(i);
871                        self.insert(column, v);
872                    }
873                    DataType::Int16 => {
874                        let array = array.as_primitive::<Int16Type>();
875                        let v = array.value(i);
876                        self.insert(column, v);
877                    }
878                    DataType::Int32 => {
879                        let array = array.as_primitive::<Int32Type>();
880                        let v = array.value(i);
881                        self.insert(column, v);
882                    }
883                    DataType::Int64 => {
884                        let array = array.as_primitive::<Int64Type>();
885                        let v = array.value(i);
886                        self.insert(column, v);
887                    }
888                    DataType::Float32 => {
889                        let array = array.as_primitive::<Float32Type>();
890                        let v = array.value(i);
891                        self.insert(column, v);
892                    }
893                    DataType::Float64 => {
894                        let array = array.as_primitive::<Float64Type>();
895                        let v = array.value(i);
896                        self.insert(column, v);
897                    }
898                    DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
899                        let v = datatypes::arrow_array::string_array_value(array, i);
900                        self.insert(column, v);
901                    }
902                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
903                        let v = datatypes::arrow_array::binary_array_value(array, i);
904                        let column_schema = &schema.column_schemas()[j];
905                        self.insert_bytes(column_schema, v)?;
906                    }
907                    DataType::Date32 => {
908                        let array = array.as_primitive::<Date32Type>();
909                        let v = Date::new(array.value(i));
910                        self.insert(column, v);
911                    }
912                    DataType::Date64 => {
913                        let array = array.as_primitive::<Date64Type>();
914                        // `Date64` values are milliseconds representation of `Date32` values,
915                        // according to its specification. So we convert the `Date64` value here to
916                        // the `Date32` value to process them unified.
917                        let v = Date::new((array.value(i) / 86_400_000) as i32);
918                        self.insert(column, v);
919                    }
920                    DataType::Timestamp(_, _) => {
921                        let v = datatypes::arrow_array::timestamp_array_value(array, i);
922                        self.insert(column, v.to_iso8601_string());
923                    }
924                    DataType::Time32(_) | DataType::Time64(_) => {
925                        let v = datatypes::arrow_array::time_array_value(array, i);
926                        self.insert(column, v.to_iso8601_string());
927                    }
928                    DataType::Interval(interval_unit) => match interval_unit {
929                        IntervalUnit::YearMonth => {
930                            let array = array.as_primitive::<IntervalYearMonthType>();
931                            let v: IntervalYearMonth = array.value(i).into();
932                            self.insert(column, v.to_iso8601_string());
933                        }
934                        IntervalUnit::DayTime => {
935                            let array = array.as_primitive::<IntervalDayTimeType>();
936                            let v: IntervalDayTime = array.value(i).into();
937                            self.insert(column, v.to_iso8601_string());
938                        }
939                        IntervalUnit::MonthDayNano => {
940                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
941                            let v: IntervalMonthDayNano = array.value(i).into();
942                            self.insert(column, v.to_iso8601_string());
943                        }
944                    },
945                    DataType::Duration(_) => {
946                        let d = datatypes::arrow_array::duration_array_value(array, i);
947                        self.insert(column, d);
948                    }
949                    DataType::List(_) => {
950                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
951                        self.insert(column, v);
952                    }
953                    DataType::Struct(_) => {
954                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
955                        self.insert(column, v);
956                    }
957                    DataType::Decimal128(precision, scale) => {
958                        let array = array.as_primitive::<Decimal128Type>();
959                        let v = Decimal128::new(array.value(i), *precision, *scale);
960                        self.insert(column, v);
961                    }
962                    _ => {
963                        return NotSupportedSnafu {
964                            feat: format!("convert {} to http value", array.data_type()),
965                        }
966                        .fail();
967                    }
968                }
969            }
970
971            series.push(self.finish())
972        }
973        Ok(())
974    }
975}
976
977#[derive(Debug, Clone, Copy, PartialEq)]
978struct ColumnRef<'a>(&'a str);
979
980impl<'a> From<&'a str> for ColumnRef<'a> {
981    fn from(s: &'a str) -> Self {
982        Self(s)
983    }
984}
985
986trait AsColumnRef {
987    fn as_ref(&self) -> ColumnRef<'_>;
988}
989
990impl AsColumnRef for Column {
991    fn as_ref(&self) -> ColumnRef<'_> {
992        self.0.as_str().into()
993    }
994}
995
996impl AsColumnRef for ColumnRef<'_> {
997    fn as_ref(&self) -> ColumnRef<'_> {
998        *self
999    }
1000}
1001
1002impl<'a> PartialEq for dyn AsColumnRef + 'a {
1003    fn eq(&self, other: &Self) -> bool {
1004        self.as_ref() == other.as_ref()
1005    }
1006}
1007
1008impl<'a> Eq for dyn AsColumnRef + 'a {}
1009
1010impl<'a> Hash for dyn AsColumnRef + 'a {
1011    fn hash<H: Hasher>(&self, state: &mut H) {
1012        self.as_ref().0.hash(state);
1013    }
1014}
1015
1016impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1017    fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1018        self
1019    }
1020}
1021
1022/// Retrieve labels name from record batches
1023fn record_batches_to_labels_name(
1024    batches: RecordBatches,
1025    labels: &mut HashSet<String>,
1026) -> Result<()> {
1027    let mut column_indices = Vec::new();
1028    let mut field_column_indices = Vec::new();
1029    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1030        if let ConcreteDataType::Float64(_) = column.data_type {
1031            field_column_indices.push(i);
1032        }
1033        column_indices.push(i);
1034    }
1035
1036    if field_column_indices.is_empty() {
1037        return Err(Error::Internal {
1038            err_msg: "no value column found".to_string(),
1039        });
1040    }
1041
1042    for batch in batches.iter() {
1043        let names = column_indices
1044            .iter()
1045            .map(|c| batches.schema().column_name_by_index(*c).to_string())
1046            .collect::<Vec<_>>();
1047
1048        let field_columns = field_column_indices
1049            .iter()
1050            .map(|i| {
1051                let column = batch.column(*i);
1052                column.as_primitive::<Float64Type>()
1053            })
1054            .collect::<Vec<_>>();
1055
1056        for row_index in 0..batch.num_rows() {
1057            // if all field columns are null, skip this row
1058            if field_columns.iter().all(|c| c.is_null(row_index)) {
1059                continue;
1060            }
1061
1062            // if a field is not null, record the tag name and return
1063            names.iter().for_each(|name| {
1064                let _ = labels.insert(name.clone());
1065            });
1066            return Ok(());
1067        }
1068    }
1069    Ok(())
1070}
1071
1072pub(crate) fn retrieve_metric_name_and_result_type(
1073    promql: &str,
1074) -> Result<(Option<String>, ValueType)> {
1075    let promql_expr = promql_parser::parser::parse(promql)
1076        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1077    let metric_name = promql_expr_to_metric_name(&promql_expr);
1078    let result_type = promql_expr.value_type();
1079
1080    Ok((metric_name, result_type))
1081}
1082
1083/// Tries to get catalog and schema from an optional db param. And retrieves
1084/// them from [QueryContext] if they don't present.
1085pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1086    if let Some(db) = db {
1087        parse_catalog_and_schema_from_db_string(db)
1088    } else {
1089        (
1090            ctx.current_catalog().to_string(),
1091            ctx.current_schema().clone(),
1092        )
1093    }
1094}
1095
1096/// Update catalog and schema in [QueryContext] if necessary.
1097pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1098    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1099        ctx.set_current_catalog(catalog);
1100        ctx.set_current_schema(schema);
1101    }
1102}
1103
1104fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1105    let mut metric_names = HashSet::new();
1106    collect_metric_names(expr, &mut metric_names);
1107
1108    // Return the metric name only if there's exactly one unique metric name
1109    if metric_names.len() == 1 {
1110        metric_names.into_iter().next()
1111    } else {
1112        None
1113    }
1114}
1115
1116/// Recursively collect all metric names from a PromQL expression
1117fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1118    match expr {
1119        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1120            match modifier {
1121                Some(LabelModifier::Include(labels)) => {
1122                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
1123                        metric_names.clear();
1124                        return;
1125                    }
1126                }
1127                Some(LabelModifier::Exclude(labels)) => {
1128                    if labels.labels.contains(&METRIC_NAME.to_string()) {
1129                        metric_names.clear();
1130                        return;
1131                    }
1132                }
1133                _ => {}
1134            }
1135            collect_metric_names(expr, metric_names)
1136        }
1137        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1138        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1139            if matches!(
1140                op.id(),
1141                token::T_LAND // INTERSECT
1142                    | token::T_LOR // UNION
1143                    | token::T_LUNLESS // EXCEPT
1144            ) {
1145                collect_metric_names(lhs, metric_names)
1146            } else {
1147                metric_names.clear()
1148            }
1149        }
1150        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1151        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1152        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1153            if let Some(name) = name {
1154                metric_names.insert(name.clone());
1155            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1156                metric_names.insert(matcher.value);
1157            }
1158        }
1159        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1160            let VectorSelector { name, matchers, .. } = vs;
1161            if let Some(name) = name {
1162                metric_names.insert(name.clone());
1163            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1164                metric_names.insert(matcher.value);
1165            }
1166        }
1167        PromqlExpr::Call(Call { args, .. }) => {
1168            args.args
1169                .iter()
1170                .for_each(|e| collect_metric_names(e, metric_names));
1171        }
1172        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1173    }
1174}
1175
1176fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1177where
1178    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1179{
1180    match expr {
1181        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1182        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1183        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1184            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1185        }
1186        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1187        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1188        PromqlExpr::NumberLiteral(_) => None,
1189        PromqlExpr::StringLiteral(_) => None,
1190        PromqlExpr::Extension(_) => None,
1191        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1192        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1193            let VectorSelector { name, matchers, .. } = vs;
1194
1195            f(name, matchers)
1196        }
1197        PromqlExpr::Call(Call { args, .. }) => args
1198            .args
1199            .iter()
1200            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1201    }
1202}
1203
1204/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
1205fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1206    find_metric_name_and_matchers(expr, |name, matchers| {
1207        // Has name, ignore the matchers
1208        if name.is_some() {
1209            return None;
1210        }
1211
1212        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
1213        Some(matchers.find_matchers(METRIC_NAME))
1214    })
1215    .map(|matchers| {
1216        matchers
1217            .into_iter()
1218            .filter(|m| !matches!(m.op, MatchOp::Equal))
1219            .collect::<Vec<_>>()
1220    })
1221}
1222
1223/// Update the `__name__` matchers in expression into special value
1224/// Returns the updated expression.
1225fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1226    match expr {
1227        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1228            update_metric_name_matcher(expr, metric_name)
1229        }
1230        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1231        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1232            update_metric_name_matcher(lhs, metric_name);
1233            update_metric_name_matcher(rhs, metric_name);
1234        }
1235        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1236        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1237            update_metric_name_matcher(expr, metric_name)
1238        }
1239        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1240            if name.is_some() {
1241                return;
1242            }
1243
1244            for m in &mut matchers.matchers {
1245                if m.name == METRIC_NAME {
1246                    m.op = MatchOp::Equal;
1247                    m.value = metric_name.to_string();
1248                }
1249            }
1250        }
1251        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1252            let VectorSelector { name, matchers, .. } = vs;
1253            if name.is_some() {
1254                return;
1255            }
1256
1257            for m in &mut matchers.matchers {
1258                if m.name == METRIC_NAME {
1259                    m.op = MatchOp::Equal;
1260                    m.value = metric_name.to_string();
1261                }
1262            }
1263        }
1264        PromqlExpr::Call(Call { args, .. }) => {
1265            args.args.iter_mut().for_each(|e| {
1266                update_metric_name_matcher(e, metric_name);
1267            });
1268        }
1269        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1270    }
1271}
1272
1273#[derive(Debug, Default, Serialize, Deserialize)]
1274pub struct LabelValueQuery {
1275    start: Option<String>,
1276    end: Option<String>,
1277    lookback: Option<String>,
1278    #[serde(flatten)]
1279    matches: Matches,
1280    db: Option<String>,
1281    limit: Option<usize>,
1282}
1283
1284#[axum_macros::debug_handler]
1285#[tracing::instrument(
1286    skip_all,
1287    fields(protocol = "prometheus", request_type = "label_values_query")
1288)]
1289pub async fn label_values_query(
1290    State(handler): State<PrometheusHandlerRef>,
1291    Path(label_name): Path<String>,
1292    Extension(mut query_ctx): Extension<QueryContext>,
1293    Query(params): Query<LabelValueQuery>,
1294) -> PrometheusJsonResponse {
1295    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1296    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1297    let query_ctx = Arc::new(query_ctx);
1298
1299    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1300        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1301        .start_timer();
1302
1303    if label_name == METRIC_NAME_LABEL {
1304        let catalog_manager = handler.catalog_manager();
1305
1306        let mut table_names = try_call_return_response!(
1307            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1308        );
1309
1310        truncate_results(&mut table_names, params.limit);
1311        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1312    } else if label_name == FIELD_NAME_LABEL {
1313        let field_columns = handle_schema_err!(
1314            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1315        )
1316        .unwrap_or_default();
1317        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1318        field_columns.sort_unstable();
1319        truncate_results(&mut field_columns, params.limit);
1320        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1321    } else if is_database_selection_label(&label_name) {
1322        let catalog_manager = handler.catalog_manager();
1323
1324        let mut schema_names = try_call_return_response!(
1325            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1326        );
1327        truncate_results(&mut schema_names, params.limit);
1328        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1329    }
1330
1331    let queries = params.matches.0;
1332    if queries.is_empty() {
1333        return PrometheusJsonResponse::error(
1334            StatusCode::InvalidArguments,
1335            "match[] parameter is required",
1336        );
1337    }
1338
1339    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1340    let end = params.end.unwrap_or_else(current_time_rfc3339);
1341    let mut label_values = HashSet::new();
1342
1343    let start = try_call_return_response!(
1344        QueryLanguageParser::parse_promql_timestamp(&start)
1345            .context(ParseTimestampSnafu { timestamp: &start })
1346    );
1347    let end = try_call_return_response!(
1348        QueryLanguageParser::parse_promql_timestamp(&end)
1349            .context(ParseTimestampSnafu { timestamp: &end })
1350    );
1351
1352    for query in queries {
1353        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1354        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1355            return PrometheusJsonResponse::error(
1356                StatusCode::InvalidArguments,
1357                "expected vector selector",
1358            );
1359        };
1360        let Some(name) = take_metric_name(&mut vector_selector) else {
1361            return PrometheusJsonResponse::error(
1362                StatusCode::InvalidArguments,
1363                "expected metric name",
1364            );
1365        };
1366        let VectorSelector { matchers, .. } = vector_selector;
1367        // Only use and filter matchers.
1368        let matchers = matchers.matchers;
1369        let result = handler
1370            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1371            .await;
1372        if let Some(result) = handle_schema_err!(result) {
1373            label_values.extend(result.into_iter());
1374        }
1375    }
1376
1377    let mut label_values: Vec<_> = label_values.into_iter().collect();
1378    label_values.sort_unstable();
1379    truncate_results(&mut label_values, params.limit);
1380
1381    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1382}
1383
1384fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1385    if let Some(limit) = limit
1386        && limit > 0
1387        && label_values.len() >= limit
1388    {
1389        label_values.truncate(limit);
1390    }
1391}
1392
1393/// Take metric name from the [VectorSelector].
1394/// It takes the name in the selector or removes the name matcher.
1395fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1396    if let Some(name) = selector.name.take() {
1397        return Some(name);
1398    }
1399
1400    let (pos, matcher) = selector
1401        .matchers
1402        .matchers
1403        .iter()
1404        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1405    let name = matcher.value.clone();
1406    // We need to remove the name matcher to avoid using it as a filter in query.
1407    selector.matchers.matchers.remove(pos);
1408
1409    Some(name)
1410}
1411
1412async fn retrieve_table_names(
1413    query_ctx: &QueryContext,
1414    catalog_manager: CatalogManagerRef,
1415    matches: Vec<String>,
1416) -> Result<Vec<String>> {
1417    let catalog = query_ctx.current_catalog();
1418    let schema = query_ctx.current_schema();
1419
1420    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1421    let mut table_names = Vec::new();
1422
1423    // we only provide very limited support for matcher against __name__
1424    let name_matcher = matches
1425        .first()
1426        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1427        .and_then(|expr| {
1428            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1429                let matchers = vector_selector.matchers.matchers;
1430                for matcher in matchers {
1431                    if matcher.name == METRIC_NAME_LABEL {
1432                        return Some(matcher);
1433                    }
1434                }
1435
1436                None
1437            } else {
1438                None
1439            }
1440        });
1441
1442    while let Some(table) = tables_stream.next().await {
1443        let table = table.context(CatalogSnafu)?;
1444        if !table
1445            .table_info()
1446            .meta
1447            .options
1448            .extra_options
1449            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1450        {
1451            // skip non-prometheus (non-metricengine) tables for __name__ query
1452            continue;
1453        }
1454
1455        let table_name = &table.table_info().name;
1456
1457        if let Some(matcher) = &name_matcher {
1458            match &matcher.op {
1459                MatchOp::Equal => {
1460                    if table_name == &matcher.value {
1461                        table_names.push(table_name.clone());
1462                    }
1463                }
1464                MatchOp::Re(reg) => {
1465                    if reg.is_match(table_name) {
1466                        table_names.push(table_name.clone());
1467                    }
1468                }
1469                _ => {
1470                    // != and !~ are not supported:
1471                    // vector must contains at least one non-empty matcher
1472                    table_names.push(table_name.clone());
1473                }
1474            }
1475        } else {
1476            table_names.push(table_name.clone());
1477        }
1478    }
1479
1480    table_names.sort_unstable();
1481    Ok(table_names)
1482}
1483
1484async fn retrieve_field_names(
1485    query_ctx: &QueryContext,
1486    manager: CatalogManagerRef,
1487    matches: Vec<String>,
1488) -> Result<HashSet<String>> {
1489    let mut field_columns = HashSet::new();
1490    let catalog = query_ctx.current_catalog();
1491    let schema = query_ctx.current_schema();
1492
1493    if matches.is_empty() {
1494        // query all tables if no matcher is provided
1495        while let Some(table) = manager
1496            .tables(catalog, &schema, Some(query_ctx))
1497            .next()
1498            .await
1499        {
1500            let table = table.context(CatalogSnafu)?;
1501            for column in table.field_columns() {
1502                field_columns.insert(column.name);
1503            }
1504        }
1505        return Ok(field_columns);
1506    }
1507
1508    for table_name in matches {
1509        let table = manager
1510            .table(catalog, &schema, &table_name, Some(query_ctx))
1511            .await
1512            .context(CatalogSnafu)?
1513            .with_context(|| TableNotFoundSnafu {
1514                catalog: catalog.to_string(),
1515                schema: schema.clone(),
1516                table: table_name.clone(),
1517            })?;
1518
1519        for column in table.field_columns() {
1520            field_columns.insert(column.name);
1521        }
1522    }
1523    Ok(field_columns)
1524}
1525
1526async fn retrieve_schema_names(
1527    query_ctx: &QueryContext,
1528    catalog_manager: CatalogManagerRef,
1529    matches: Vec<String>,
1530) -> Result<Vec<String>> {
1531    let mut schemas = Vec::new();
1532    let catalog = query_ctx.current_catalog();
1533
1534    let candidate_schemas = catalog_manager
1535        .schema_names(catalog, Some(query_ctx))
1536        .await
1537        .context(CatalogSnafu)?;
1538
1539    for schema in candidate_schemas {
1540        let mut found = true;
1541        for match_item in &matches {
1542            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1543                let exists = catalog_manager
1544                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1545                    .await
1546                    .context(CatalogSnafu)?;
1547                if !exists {
1548                    found = false;
1549                    break;
1550                }
1551            }
1552        }
1553
1554        if found {
1555            schemas.push(schema);
1556        }
1557    }
1558
1559    schemas.sort_unstable();
1560
1561    Ok(schemas)
1562}
1563
1564/// Try to parse and extract the name of referenced metric from the promql query.
1565///
1566/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1567/// Multiple references to the same metric are allowed.
1568fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1569    let promql_expr = promql_parser::parser::parse(query).ok()?;
1570    promql_expr_to_metric_name(&promql_expr)
1571}
1572
1573#[derive(Debug, Default, Serialize, Deserialize)]
1574pub struct SeriesQuery {
1575    start: Option<String>,
1576    end: Option<String>,
1577    lookback: Option<String>,
1578    #[serde(flatten)]
1579    matches: Matches,
1580    db: Option<String>,
1581}
1582
1583#[axum_macros::debug_handler]
1584#[tracing::instrument(
1585    skip_all,
1586    fields(protocol = "prometheus", request_type = "series_query")
1587)]
1588pub async fn series_query(
1589    State(handler): State<PrometheusHandlerRef>,
1590    Query(params): Query<SeriesQuery>,
1591    Extension(mut query_ctx): Extension<QueryContext>,
1592    Form(form_params): Form<SeriesQuery>,
1593) -> PrometheusJsonResponse {
1594    let mut queries: Vec<String> = params.matches.0;
1595    if queries.is_empty() {
1596        queries = form_params.matches.0;
1597    }
1598    if queries.is_empty() {
1599        return PrometheusJsonResponse::error(
1600            StatusCode::Unsupported,
1601            "match[] parameter is required",
1602        );
1603    }
1604    let start = params
1605        .start
1606        .or(form_params.start)
1607        .unwrap_or_else(yesterday_rfc3339);
1608    let end = params
1609        .end
1610        .or(form_params.end)
1611        .unwrap_or_else(current_time_rfc3339);
1612    let lookback = params
1613        .lookback
1614        .or(form_params.lookback)
1615        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1616
1617    // update catalog and schema in query context if necessary
1618    if let Some(db) = &params.db {
1619        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1620        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1621    }
1622    let query_ctx = Arc::new(query_ctx);
1623
1624    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1625        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1626        .start_timer();
1627
1628    let mut series = Vec::new();
1629    let mut merge_map = HashMap::new();
1630    for query in queries {
1631        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1632        let prom_query = PromQuery {
1633            query,
1634            start: start.clone(),
1635            end: end.clone(),
1636            // TODO: find a better value for step
1637            step: DEFAULT_LOOKBACK_STRING.to_string(),
1638            lookback: lookback.clone(),
1639            alias: None,
1640        };
1641        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1642
1643        handle_schema_err!(
1644            retrieve_series_from_query_result(
1645                result,
1646                &mut series,
1647                &query_ctx,
1648                &table_name,
1649                &handler.catalog_manager(),
1650                &mut merge_map,
1651            )
1652            .await
1653        );
1654    }
1655    let merge_map = merge_map
1656        .into_iter()
1657        .map(|(k, v)| (k, Value::from(v)))
1658        .collect();
1659    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1660    resp.resp_metrics = merge_map;
1661    resp
1662}
1663
1664#[derive(Debug, Default, Serialize, Deserialize)]
1665pub struct ParseQuery {
1666    query: Option<String>,
1667    db: Option<String>,
1668}
1669
1670#[axum_macros::debug_handler]
1671#[tracing::instrument(
1672    skip_all,
1673    fields(protocol = "prometheus", request_type = "parse_query")
1674)]
1675pub async fn parse_query(
1676    State(_handler): State<PrometheusHandlerRef>,
1677    Query(params): Query<ParseQuery>,
1678    Extension(_query_ctx): Extension<QueryContext>,
1679    Form(form_params): Form<ParseQuery>,
1680) -> PrometheusJsonResponse {
1681    if let Some(query) = params.query.or(form_params.query) {
1682        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1683        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1684    } else {
1685        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1686    }
1687}
1688
1689#[cfg(test)]
1690mod tests {
1691    use promql_parser::parser::value::ValueType;
1692
1693    use super::*;
1694
1695    struct TestCase {
1696        name: &'static str,
1697        promql: &'static str,
1698        expected_metric: Option<&'static str>,
1699        expected_type: ValueType,
1700        should_error: bool,
1701    }
1702
1703    #[test]
1704    fn test_retrieve_metric_name_and_result_type() {
1705        let test_cases = &[
1706            // Single metric cases
1707            TestCase {
1708                name: "simple metric",
1709                promql: "cpu_usage",
1710                expected_metric: Some("cpu_usage"),
1711                expected_type: ValueType::Vector,
1712                should_error: false,
1713            },
1714            TestCase {
1715                name: "metric with selector",
1716                promql: r#"cpu_usage{instance="localhost"}"#,
1717                expected_metric: Some("cpu_usage"),
1718                expected_type: ValueType::Vector,
1719                should_error: false,
1720            },
1721            TestCase {
1722                name: "metric with range selector",
1723                promql: "cpu_usage[5m]",
1724                expected_metric: Some("cpu_usage"),
1725                expected_type: ValueType::Matrix,
1726                should_error: false,
1727            },
1728            TestCase {
1729                name: "metric with __name__ matcher",
1730                promql: r#"{__name__="cpu_usage"}"#,
1731                expected_metric: Some("cpu_usage"),
1732                expected_type: ValueType::Vector,
1733                should_error: false,
1734            },
1735            TestCase {
1736                name: "metric with unary operator",
1737                promql: "-cpu_usage",
1738                expected_metric: None,
1739                expected_type: ValueType::Vector,
1740                should_error: false,
1741            },
1742            // Aggregation and function cases
1743            TestCase {
1744                name: "metric with aggregation",
1745                promql: "sum(cpu_usage)",
1746                expected_metric: Some("cpu_usage"),
1747                expected_type: ValueType::Vector,
1748                should_error: false,
1749            },
1750            TestCase {
1751                name: "complex aggregation",
1752                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1753                expected_metric: None,
1754                expected_type: ValueType::Vector,
1755                should_error: false,
1756            },
1757            TestCase {
1758                name: "complex aggregation",
1759                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1760                expected_metric: Some("cpu_usage"),
1761                expected_type: ValueType::Vector,
1762                should_error: false,
1763            },
1764            TestCase {
1765                name: "complex aggregation",
1766                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1767                expected_metric: Some("cpu_usage"),
1768                expected_type: ValueType::Vector,
1769                should_error: false,
1770            },
1771            // Same metric binary operations
1772            TestCase {
1773                name: "same metric addition",
1774                promql: "cpu_usage + cpu_usage",
1775                expected_metric: None,
1776                expected_type: ValueType::Vector,
1777                should_error: false,
1778            },
1779            TestCase {
1780                name: "metric with scalar addition",
1781                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1782                expected_metric: None,
1783                expected_type: ValueType::Vector,
1784                should_error: false,
1785            },
1786            // Multiple metrics cases
1787            TestCase {
1788                name: "different metrics addition",
1789                promql: "cpu_usage + memory_usage",
1790                expected_metric: None,
1791                expected_type: ValueType::Vector,
1792                should_error: false,
1793            },
1794            TestCase {
1795                name: "different metrics subtraction",
1796                promql: "network_in - network_out",
1797                expected_metric: None,
1798                expected_type: ValueType::Vector,
1799                should_error: false,
1800            },
1801            // Unless operator cases
1802            TestCase {
1803                name: "unless with different metrics",
1804                promql: "cpu_usage unless memory_usage",
1805                expected_metric: Some("cpu_usage"),
1806                expected_type: ValueType::Vector,
1807                should_error: false,
1808            },
1809            TestCase {
1810                name: "unless with same metric",
1811                promql: "cpu_usage unless cpu_usage",
1812                expected_metric: Some("cpu_usage"),
1813                expected_type: ValueType::Vector,
1814                should_error: false,
1815            },
1816            // Subquery cases
1817            TestCase {
1818                name: "basic subquery",
1819                promql: "cpu_usage[5m:1m]",
1820                expected_metric: Some("cpu_usage"),
1821                expected_type: ValueType::Matrix,
1822                should_error: false,
1823            },
1824            TestCase {
1825                name: "subquery with multiple metrics",
1826                promql: "(cpu_usage + memory_usage)[5m:1m]",
1827                expected_metric: None,
1828                expected_type: ValueType::Matrix,
1829                should_error: false,
1830            },
1831            // Literal values
1832            TestCase {
1833                name: "scalar value",
1834                promql: "42",
1835                expected_metric: None,
1836                expected_type: ValueType::Scalar,
1837                should_error: false,
1838            },
1839            TestCase {
1840                name: "string literal",
1841                promql: r#""hello world""#,
1842                expected_metric: None,
1843                expected_type: ValueType::String,
1844                should_error: false,
1845            },
1846            // Error cases
1847            TestCase {
1848                name: "invalid syntax",
1849                promql: "cpu_usage{invalid=",
1850                expected_metric: None,
1851                expected_type: ValueType::Vector,
1852                should_error: true,
1853            },
1854            TestCase {
1855                name: "empty query",
1856                promql: "",
1857                expected_metric: None,
1858                expected_type: ValueType::Vector,
1859                should_error: true,
1860            },
1861            TestCase {
1862                name: "malformed brackets",
1863                promql: "cpu_usage[5m",
1864                expected_metric: None,
1865                expected_type: ValueType::Vector,
1866                should_error: true,
1867            },
1868        ];
1869
1870        for test_case in test_cases {
1871            let result = retrieve_metric_name_and_result_type(test_case.promql);
1872
1873            if test_case.should_error {
1874                assert!(
1875                    result.is_err(),
1876                    "Test '{}' should have failed but succeeded with: {:?}",
1877                    test_case.name,
1878                    result
1879                );
1880            } else {
1881                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1882                    panic!(
1883                        "Test '{}' should have succeeded but failed with error: {}",
1884                        test_case.name, e
1885                    )
1886                });
1887
1888                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1889                assert_eq!(
1890                    metric_name, expected_metric_name,
1891                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1892                    test_case.name, expected_metric_name, metric_name
1893                );
1894
1895                assert_eq!(
1896                    value_type, test_case.expected_type,
1897                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1898                    test_case.name, test_case.expected_type, value_type
1899                );
1900            }
1901        }
1902    }
1903}