servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40    UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55    CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56    TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62/// For [ValueType::Vector] result type
63#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65    pub metric: BTreeMap<String, String>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub value: Option<(f64, String)>,
68}
69
70/// For [ValueType::Matrix] result type
71#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73    pub metric: BTreeMap<String, String>,
74    pub values: Vec<(f64, String)>,
75}
76
77/// Variants corresponding to [ValueType]
78#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81    Matrix(Vec<PromSeriesMatrix>),
82    Vector(Vec<PromSeriesVector>),
83    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88    fn default() -> Self {
89        PromQueryResult::Matrix(Default::default())
90    }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95    #[serde(rename = "resultType")]
96    pub result_type: String,
97    pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103    PromData(PromData),
104    Labels(Vec<String>),
105    Series(Vec<HashMap<String, String>>),
106    LabelValues(Vec<String>),
107    FormatQuery(String),
108    BuildInfo(OwnedBuildInfo),
109    #[serde(skip_deserializing)]
110    ParseResult(promql_parser::parser::Expr),
111    #[default]
112    None,
113}
114
115impl PrometheusResponse {
116    /// Append the other [`PrometheusResponse]`.
117    /// # NOTE
118    ///   Only append matrix and vector results, otherwise just ignore the other response.
119    fn append(&mut self, other: PrometheusResponse) {
120        match (self, other) {
121            (
122                PrometheusResponse::PromData(PromData {
123                    result: PromQueryResult::Matrix(lhs),
124                    ..
125                }),
126                PrometheusResponse::PromData(PromData {
127                    result: PromQueryResult::Matrix(rhs),
128                    ..
129                }),
130            ) => {
131                lhs.extend(rhs);
132            }
133
134            (
135                PrometheusResponse::PromData(PromData {
136                    result: PromQueryResult::Vector(lhs),
137                    ..
138                }),
139                PrometheusResponse::PromData(PromData {
140                    result: PromQueryResult::Vector(rhs),
141                    ..
142                }),
143            ) => {
144                lhs.extend(rhs);
145            }
146            _ => {
147                // TODO(dennis): process other cases?
148            }
149        }
150    }
151
152    pub fn is_none(&self) -> bool {
153        matches!(self, PrometheusResponse::None)
154    }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159    query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164    skip_all,
165    fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168    State(_handler): State<PrometheusHandlerRef>,
169    Query(params): Query<InstantQuery>,
170    Extension(_query_ctx): Extension<QueryContext>,
171    Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173    let query = params.query.or(form_params.query).unwrap_or_default();
174    match promql_parser::parser::parse(&query) {
175        Ok(expr) => {
176            let pretty = expr.prettify();
177            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178        }
179        Err(reason) => {
180            let err = InvalidQuerySnafu { reason }.build();
181            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182        }
183    }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191    skip_all,
192    fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195    let build_info = common_version::build_info().clone();
196    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201    query: Option<String>,
202    lookback: Option<String>,
203    time: Option<String>,
204    timeout: Option<String>,
205    db: Option<String>,
206}
207
208/// Helper macro which try to evaluate the expression and return its results.
209/// If the evaluation fails, return a `PrometheusJsonResponse` early.
210macro_rules! try_call_return_response {
211    ($handle: expr) => {
212        match $handle {
213            Ok(res) => res,
214            Err(err) => {
215                let msg = err.to_string();
216                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217            }
218        }
219    };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224    skip_all,
225    fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228    State(handler): State<PrometheusHandlerRef>,
229    Query(params): Query<InstantQuery>,
230    Extension(mut query_ctx): Extension<QueryContext>,
231    Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233    // Extract time from query string, or use current server time if not specified.
234    let time = params
235        .time
236        .or(form_params.time)
237        .unwrap_or_else(current_time_rfc3339);
238    let prom_query = PromQuery {
239        query: params.query.or(form_params.query).unwrap_or_default(),
240        start: time.clone(),
241        end: time,
242        step: "1s".to_string(),
243        lookback: params
244            .lookback
245            .or(form_params.lookback)
246            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247    };
248
249    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251    // update catalog and schema in query context if necessary
252    if let Some(db) = &params.db {
253        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255    }
256    let query_ctx = Arc::new(query_ctx);
257
258    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260        .start_timer();
261
262    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263        && !name_matchers.is_empty()
264    {
265        debug!("Find metric name matchers: {:?}", name_matchers);
266
267        let metric_names =
268            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270        debug!("Find metric names: {:?}", metric_names);
271
272        if metric_names.is_empty() {
273            let result_type = promql_expr.value_type();
274
275            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276                result_type: result_type.to_string(),
277                ..Default::default()
278            }));
279        }
280
281        let responses = join_all(metric_names.into_iter().map(|metric| {
282            let mut prom_query = prom_query.clone();
283            let mut promql_expr = promql_expr.clone();
284            let query_ctx = query_ctx.clone();
285            let handler = handler.clone();
286
287            async move {
288                update_metric_name_matcher(&mut promql_expr, &metric);
289                let new_query = promql_expr.to_string();
290                debug!(
291                    "Updated promql, before: {}, after: {}",
292                    &prom_query.query, new_query
293                );
294                prom_query.query = new_query;
295
296                do_instant_query(&handler, &prom_query, query_ctx).await
297            }
298        }))
299        .await;
300
301        responses
302            .into_iter()
303            .reduce(|mut acc, resp| {
304                acc.data.append(resp.data);
305                acc
306            })
307            .unwrap()
308    } else {
309        do_instant_query(&handler, &prom_query, query_ctx).await
310    }
311}
312
313/// Executes a single instant query and returns response
314async fn do_instant_query(
315    handler: &PrometheusHandlerRef,
316    prom_query: &PromQuery,
317    query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319    let result = handler.do_query(prom_query, query_ctx).await;
320    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321        Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
322        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323    };
324    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329    query: Option<String>,
330    start: Option<String>,
331    end: Option<String>,
332    step: Option<String>,
333    lookback: Option<String>,
334    timeout: Option<String>,
335    db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340    skip_all,
341    fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344    State(handler): State<PrometheusHandlerRef>,
345    Query(params): Query<RangeQuery>,
346    Extension(mut query_ctx): Extension<QueryContext>,
347    Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349    let prom_query = PromQuery {
350        query: params.query.or(form_params.query).unwrap_or_default(),
351        start: params.start.or(form_params.start).unwrap_or_default(),
352        end: params.end.or(form_params.end).unwrap_or_default(),
353        step: params.step.or(form_params.step).unwrap_or_default(),
354        lookback: params
355            .lookback
356            .or(form_params.lookback)
357            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358    };
359
360    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362    // update catalog and schema in query context if necessary
363    if let Some(db) = &params.db {
364        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366    }
367    let query_ctx = Arc::new(query_ctx);
368    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370        .start_timer();
371
372    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373        && !name_matchers.is_empty()
374    {
375        debug!("Find metric name matchers: {:?}", name_matchers);
376
377        let metric_names =
378            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380        debug!("Find metric names: {:?}", metric_names);
381
382        if metric_names.is_empty() {
383            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384                result_type: ValueType::Matrix.to_string(),
385                ..Default::default()
386            }));
387        }
388
389        let responses = join_all(metric_names.into_iter().map(|metric| {
390            let mut prom_query = prom_query.clone();
391            let mut promql_expr = promql_expr.clone();
392            let query_ctx = query_ctx.clone();
393            let handler = handler.clone();
394
395            async move {
396                update_metric_name_matcher(&mut promql_expr, &metric);
397                let new_query = promql_expr.to_string();
398                debug!(
399                    "Updated promql, before: {}, after: {}",
400                    &prom_query.query, new_query
401                );
402                prom_query.query = new_query;
403
404                do_range_query(&handler, &prom_query, query_ctx).await
405            }
406        }))
407        .await;
408
409        // Safety: at least one responses, checked above
410        responses
411            .into_iter()
412            .reduce(|mut acc, resp| {
413                acc.data.append(resp.data);
414                acc
415            })
416            .unwrap()
417    } else {
418        do_range_query(&handler, &prom_query, query_ctx).await
419    }
420}
421
422/// Executes a single range query and returns response
423async fn do_range_query(
424    handler: &PrometheusHandlerRef,
425    prom_query: &PromQuery,
426    query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428    let result = handler.do_query(prom_query, query_ctx).await;
429    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431        Ok((metric_name, _)) => metric_name.unwrap_or_default(),
432    };
433    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441    start: Option<String>,
442    end: Option<String>,
443    lookback: Option<String>,
444    #[serde(flatten)]
445    matches: Matches,
446    db: Option<String>,
447}
448
449// Custom Deserialize method to support parsing repeated match[]
450impl<'de> Deserialize<'de> for Matches {
451    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452    where
453        D: de::Deserializer<'de>,
454    {
455        struct MatchesVisitor;
456
457        impl<'d> Visitor<'d> for MatchesVisitor {
458            type Value = Vec<String>;
459
460            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461                formatter.write_str("a string")
462            }
463
464            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465            where
466                M: MapAccess<'d>,
467            {
468                let mut matches = Vec::new();
469                while let Some((key, value)) = access.next_entry::<String, String>()? {
470                    if key == "match[]" {
471                        matches.push(value);
472                    }
473                }
474                Ok(matches)
475            }
476        }
477        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478    }
479}
480
481/// Handles schema errors, transforming a Result into an Option.
482/// - If the input is `Ok(v)`, returns `Some(v)`
483/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
484///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
485/// - If the input is `Err(err)` with any other error code, directly returns a
486///   `PrometheusJsonResponse::error`.
487macro_rules! handle_schema_err {
488    ($result:expr) => {
489        match $result {
490            Ok(v) => Some(v),
491            Err(err) => {
492                if err.status_code() == StatusCode::TableNotFound
493                    || err.status_code() == StatusCode::TableColumnNotFound
494                {
495                    // Prometheus won't report error if querying nonexist label and metric
496                    None
497                } else {
498                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
499                }
500            }
501        }
502    };
503}
504
505#[axum_macros::debug_handler]
506#[tracing::instrument(
507    skip_all,
508    fields(protocol = "prometheus", request_type = "labels_query")
509)]
510pub async fn labels_query(
511    State(handler): State<PrometheusHandlerRef>,
512    Query(params): Query<LabelsQuery>,
513    Extension(mut query_ctx): Extension<QueryContext>,
514    Form(form_params): Form<LabelsQuery>,
515) -> PrometheusJsonResponse {
516    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
517    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
518    let query_ctx = Arc::new(query_ctx);
519
520    let mut queries = params.matches.0;
521    if queries.is_empty() {
522        queries = form_params.matches.0;
523    }
524
525    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
526        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
527        .start_timer();
528
529    // Fetch all tag columns. It will be used as white-list for tag names.
530    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
531    {
532        Ok(labels) => labels,
533        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
534    };
535    // insert the special metric name label
536    let _ = labels.insert(METRIC_NAME.to_string());
537
538    // Fetch all columns if no query matcher is provided
539    if queries.is_empty() {
540        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
541        labels_vec.sort_unstable();
542        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
543    }
544
545    // Otherwise, run queries and extract column name from result set.
546    let start = params
547        .start
548        .or(form_params.start)
549        .unwrap_or_else(yesterday_rfc3339);
550    let end = params
551        .end
552        .or(form_params.end)
553        .unwrap_or_else(current_time_rfc3339);
554    let lookback = params
555        .lookback
556        .or(form_params.lookback)
557        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
558
559    let mut fetched_labels = HashSet::new();
560    let _ = fetched_labels.insert(METRIC_NAME.to_string());
561
562    let mut merge_map = HashMap::new();
563    for query in queries {
564        let prom_query = PromQuery {
565            query,
566            start: start.clone(),
567            end: end.clone(),
568            step: DEFAULT_LOOKBACK_STRING.to_string(),
569            lookback: lookback.clone(),
570        };
571
572        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
573        handle_schema_err!(
574            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
575                .await
576        );
577    }
578
579    // intersect `fetched_labels` with `labels` to filter out non-tag columns
580    fetched_labels.retain(|l| labels.contains(l));
581    let _ = labels.insert(METRIC_NAME.to_string());
582
583    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
584    sorted_labels.sort();
585    let merge_map = merge_map
586        .into_iter()
587        .map(|(k, v)| (k, Value::from(v)))
588        .collect();
589    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
590    resp.resp_metrics = merge_map;
591    resp
592}
593
594/// Get all tag column name of the given schema
595async fn get_all_column_names(
596    catalog: &str,
597    schema: &str,
598    manager: &CatalogManagerRef,
599) -> std::result::Result<HashSet<String>, catalog::error::Error> {
600    let table_names = manager.table_names(catalog, schema, None).await?;
601
602    let mut labels = HashSet::new();
603    for table_name in table_names {
604        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
605            continue;
606        };
607        for column in table.primary_key_columns() {
608            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
609                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
610            {
611                labels.insert(column.name);
612            }
613        }
614    }
615
616    Ok(labels)
617}
618
619async fn retrieve_series_from_query_result(
620    result: Result<Output>,
621    series: &mut Vec<HashMap<String, String>>,
622    query_ctx: &QueryContext,
623    table_name: &str,
624    manager: &CatalogManagerRef,
625    metrics: &mut HashMap<String, u64>,
626) -> Result<()> {
627    let result = result?;
628
629    // fetch tag list
630    let table = manager
631        .table(
632            query_ctx.current_catalog(),
633            &query_ctx.current_schema(),
634            table_name,
635            Some(query_ctx),
636        )
637        .await
638        .context(CatalogSnafu)?
639        .with_context(|| TableNotFoundSnafu {
640            catalog: query_ctx.current_catalog(),
641            schema: query_ctx.current_schema(),
642            table: table_name,
643        })?;
644    let tag_columns = table
645        .primary_key_columns()
646        .map(|c| c.name)
647        .collect::<HashSet<_>>();
648
649    match result.data {
650        OutputData::RecordBatches(batches) => {
651            record_batches_to_series(batches, series, table_name, &tag_columns)
652        }
653        OutputData::Stream(stream) => {
654            let batches = RecordBatches::try_collect(stream)
655                .await
656                .context(CollectRecordbatchSnafu)?;
657            record_batches_to_series(batches, series, table_name, &tag_columns)
658        }
659        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
660            reason: "expected data result, but got affected rows".to_string(),
661            location: Location::default(),
662        }),
663    }?;
664
665    if let Some(ref plan) = result.meta.plan {
666        collect_plan_metrics(plan, &mut [metrics]);
667    }
668    Ok(())
669}
670
671/// Retrieve labels name from query result
672async fn retrieve_labels_name_from_query_result(
673    result: Result<Output>,
674    labels: &mut HashSet<String>,
675    metrics: &mut HashMap<String, u64>,
676) -> Result<()> {
677    let result = result?;
678    match result.data {
679        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
680        OutputData::Stream(stream) => {
681            let batches = RecordBatches::try_collect(stream)
682                .await
683                .context(CollectRecordbatchSnafu)?;
684            record_batches_to_labels_name(batches, labels)
685        }
686        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
687            reason: "expected data result, but got affected rows".to_string(),
688        }
689        .fail(),
690    }?;
691    if let Some(ref plan) = result.meta.plan {
692        collect_plan_metrics(plan, &mut [metrics]);
693    }
694    Ok(())
695}
696
697fn record_batches_to_series(
698    batches: RecordBatches,
699    series: &mut Vec<HashMap<String, String>>,
700    table_name: &str,
701    tag_columns: &HashSet<String>,
702) -> Result<()> {
703    for batch in batches.iter() {
704        // project record batch to only contains tag columns
705        let projection = batch
706            .schema
707            .column_schemas()
708            .iter()
709            .enumerate()
710            .filter_map(|(idx, col)| {
711                if tag_columns.contains(&col.name) {
712                    Some(idx)
713                } else {
714                    None
715                }
716            })
717            .collect::<Vec<_>>();
718        let batch = batch
719            .try_project(&projection)
720            .context(CollectRecordbatchSnafu)?;
721
722        for row in batch.rows() {
723            let mut element: HashMap<String, String> = row
724                .iter()
725                .enumerate()
726                .map(|(idx, column)| {
727                    let column_name = batch.schema.column_name_by_index(idx);
728                    (column_name.to_string(), column.to_string())
729                })
730                .collect();
731            let _ = element.insert("__name__".to_string(), table_name.to_string());
732            series.push(element);
733        }
734    }
735    Ok(())
736}
737
738/// Retrieve labels name from record batches
739fn record_batches_to_labels_name(
740    batches: RecordBatches,
741    labels: &mut HashSet<String>,
742) -> Result<()> {
743    let mut column_indices = Vec::new();
744    let mut field_column_indices = Vec::new();
745    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
746        if let ConcreteDataType::Float64(_) = column.data_type {
747            field_column_indices.push(i);
748        }
749        column_indices.push(i);
750    }
751
752    if field_column_indices.is_empty() {
753        return Err(Error::Internal {
754            err_msg: "no value column found".to_string(),
755        });
756    }
757
758    for batch in batches.iter() {
759        let names = column_indices
760            .iter()
761            .map(|c| batches.schema().column_name_by_index(*c).to_string())
762            .collect::<Vec<_>>();
763
764        let field_columns = field_column_indices
765            .iter()
766            .map(|i| {
767                batch
768                    .column(*i)
769                    .as_any()
770                    .downcast_ref::<Float64Vector>()
771                    .unwrap()
772            })
773            .collect::<Vec<_>>();
774
775        for row_index in 0..batch.num_rows() {
776            // if all field columns are null, skip this row
777            if field_columns
778                .iter()
779                .all(|c| c.get_data(row_index).is_none())
780            {
781                continue;
782            }
783
784            // if a field is not null, record the tag name and return
785            names.iter().for_each(|name| {
786                let _ = labels.insert(name.to_string());
787            });
788            return Ok(());
789        }
790    }
791    Ok(())
792}
793
794pub(crate) fn retrieve_metric_name_and_result_type(
795    promql: &str,
796) -> Result<(Option<String>, ValueType)> {
797    let promql_expr = promql_parser::parser::parse(promql)
798        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
799    let metric_name = promql_expr_to_metric_name(&promql_expr);
800    let result_type = promql_expr.value_type();
801
802    Ok((metric_name, result_type))
803}
804
805/// Tries to get catalog and schema from an optional db param. And retrieves
806/// them from [QueryContext] if they don't present.
807pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
808    if let Some(db) = db {
809        parse_catalog_and_schema_from_db_string(db)
810    } else {
811        (
812            ctx.current_catalog().to_string(),
813            ctx.current_schema().to_string(),
814        )
815    }
816}
817
818/// Update catalog and schema in [QueryContext] if necessary.
819pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
820    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
821        ctx.set_current_catalog(catalog);
822        ctx.set_current_schema(schema);
823    }
824}
825
826fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
827    find_metric_name_and_matchers(expr, |name, matchers| {
828        name.clone().or(matchers
829            .find_matchers(METRIC_NAME)
830            .into_iter()
831            .next()
832            .map(|m| m.value))
833    })
834}
835
836fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
837where
838    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
839{
840    match expr {
841        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
842        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
843        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
844            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
845        }
846        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
847        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
848        PromqlExpr::NumberLiteral(_) => None,
849        PromqlExpr::StringLiteral(_) => None,
850        PromqlExpr::Extension(_) => None,
851        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
852        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
853            let VectorSelector { name, matchers, .. } = vs;
854
855            f(name, matchers)
856        }
857        PromqlExpr::Call(Call { args, .. }) => args
858            .args
859            .iter()
860            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
861    }
862}
863
864/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
865fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
866    find_metric_name_and_matchers(expr, |name, matchers| {
867        // Has name, ignore the matchers
868        if name.is_some() {
869            return None;
870        }
871
872        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
873        Some(matchers.find_matchers(METRIC_NAME))
874    })
875    .map(|matchers| {
876        matchers
877            .into_iter()
878            .filter(|m| !matches!(m.op, MatchOp::Equal))
879            .map(normalize_matcher)
880            .collect::<Vec<_>>()
881    })
882}
883
884/// Update the `__name__` matchers in expression into special value
885/// Returns the updated expression.
886fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
887    match expr {
888        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
889            update_metric_name_matcher(expr, metric_name)
890        }
891        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
892        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
893            update_metric_name_matcher(lhs, metric_name);
894            update_metric_name_matcher(rhs, metric_name);
895        }
896        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
897        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
898            update_metric_name_matcher(expr, metric_name)
899        }
900        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
901            if name.is_some() {
902                return;
903            }
904
905            for m in &mut matchers.matchers {
906                if m.name == METRIC_NAME {
907                    m.op = MatchOp::Equal;
908                    m.value = metric_name.to_string();
909                }
910            }
911        }
912        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
913            let VectorSelector { name, matchers, .. } = vs;
914            if name.is_some() {
915                return;
916            }
917
918            for m in &mut matchers.matchers {
919                if m.name == METRIC_NAME {
920                    m.op = MatchOp::Equal;
921                    m.value = metric_name.to_string();
922                }
923            }
924        }
925        PromqlExpr::Call(Call { args, .. }) => {
926            args.args.iter_mut().for_each(|e| {
927                update_metric_name_matcher(e, metric_name);
928            });
929        }
930        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
931    }
932}
933
934#[derive(Debug, Default, Serialize, Deserialize)]
935pub struct LabelValueQuery {
936    start: Option<String>,
937    end: Option<String>,
938    lookback: Option<String>,
939    #[serde(flatten)]
940    matches: Matches,
941    db: Option<String>,
942}
943
944#[axum_macros::debug_handler]
945#[tracing::instrument(
946    skip_all,
947    fields(protocol = "prometheus", request_type = "label_values_query")
948)]
949pub async fn label_values_query(
950    State(handler): State<PrometheusHandlerRef>,
951    Path(label_name): Path<String>,
952    Extension(mut query_ctx): Extension<QueryContext>,
953    Query(params): Query<LabelValueQuery>,
954) -> PrometheusJsonResponse {
955    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
956    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
957    let query_ctx = Arc::new(query_ctx);
958
959    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
960        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
961        .start_timer();
962
963    if label_name == METRIC_NAME_LABEL {
964        let catalog_manager = handler.catalog_manager();
965        let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
966        let mut table_names = Vec::new();
967        while let Some(table) = tables_stream.next().await {
968            // filter out physical tables
969            match table {
970                Ok(table) => {
971                    if table
972                        .table_info()
973                        .meta
974                        .options
975                        .extra_options
976                        .contains_key(PHYSICAL_TABLE_METADATA_KEY)
977                    {
978                        continue;
979                    }
980
981                    table_names.push(table.table_info().name.clone());
982                }
983                Err(e) => {
984                    return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
985                }
986            }
987        }
988        table_names.sort_unstable();
989        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
990    } else if label_name == FIELD_NAME_LABEL {
991        let field_columns = handle_schema_err!(
992            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
993        )
994        .unwrap_or_default();
995        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
996        field_columns.sort_unstable();
997        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
998    }
999
1000    let queries = params.matches.0;
1001    if queries.is_empty() {
1002        return PrometheusJsonResponse::error(
1003            StatusCode::InvalidArguments,
1004            "match[] parameter is required",
1005        );
1006    }
1007
1008    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1009    let end = params.end.unwrap_or_else(current_time_rfc3339);
1010    let mut label_values = HashSet::new();
1011
1012    let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1013        .context(ParseTimestampSnafu { timestamp: &start }));
1014    let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1015        .context(ParseTimestampSnafu { timestamp: &end }));
1016
1017    for query in queries {
1018        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1019        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1020            return PrometheusJsonResponse::error(
1021                StatusCode::InvalidArguments,
1022                "expected vector selector",
1023            );
1024        };
1025        let Some(name) = take_metric_name(&mut vector_selector) else {
1026            return PrometheusJsonResponse::error(
1027                StatusCode::InvalidArguments,
1028                "expected metric name",
1029            );
1030        };
1031        let VectorSelector { matchers, .. } = vector_selector;
1032        // Only use and filter matchers.
1033        let matchers = matchers.matchers;
1034        let result = handler
1035            .query_label_values(
1036                name,
1037                label_name.to_string(),
1038                matchers,
1039                start,
1040                end,
1041                &query_ctx,
1042            )
1043            .await;
1044        if let Some(result) = handle_schema_err!(result) {
1045            label_values.extend(result.into_iter());
1046        }
1047    }
1048
1049    let mut label_values: Vec<_> = label_values.into_iter().collect();
1050    label_values.sort_unstable();
1051    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1052}
1053
1054/// Take metric name from the [VectorSelector].
1055/// It takes the name in the selector or removes the name matcher.
1056fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1057    if let Some(name) = selector.name.take() {
1058        return Some(name);
1059    }
1060
1061    let (pos, matcher) = selector
1062        .matchers
1063        .matchers
1064        .iter()
1065        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1066    let name = matcher.value.clone();
1067    // We need to remove the name matcher to avoid using it as a filter in query.
1068    selector.matchers.matchers.remove(pos);
1069
1070    Some(name)
1071}
1072
1073async fn retrieve_field_names(
1074    query_ctx: &QueryContext,
1075    manager: CatalogManagerRef,
1076    matches: Vec<String>,
1077) -> Result<HashSet<String>> {
1078    let mut field_columns = HashSet::new();
1079    let catalog = query_ctx.current_catalog();
1080    let schema = query_ctx.current_schema();
1081
1082    if matches.is_empty() {
1083        // query all tables if no matcher is provided
1084        while let Some(table) = manager
1085            .tables(catalog, &schema, Some(query_ctx))
1086            .next()
1087            .await
1088        {
1089            let table = table.context(CatalogSnafu)?;
1090            for column in table.field_columns() {
1091                field_columns.insert(column.name);
1092            }
1093        }
1094        return Ok(field_columns);
1095    }
1096
1097    for table_name in matches {
1098        let table = manager
1099            .table(catalog, &schema, &table_name, Some(query_ctx))
1100            .await
1101            .context(CatalogSnafu)?
1102            .with_context(|| TableNotFoundSnafu {
1103                catalog: catalog.to_string(),
1104                schema: schema.to_string(),
1105                table: table_name.to_string(),
1106            })?;
1107
1108        for column in table.field_columns() {
1109            field_columns.insert(column.name);
1110        }
1111    }
1112    Ok(field_columns)
1113}
1114
1115/// Try to parse and extract the name of referenced metric from the promql query.
1116///
1117/// Returns the metric name if a single metric is referenced, otherwise None.
1118fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1119    let promql_expr = promql_parser::parser::parse(query).ok()?;
1120
1121    struct MetricNameVisitor {
1122        metric_name: Option<String>,
1123    }
1124
1125    impl promql_parser::util::ExprVisitor for MetricNameVisitor {
1126        type Error = ();
1127
1128        fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
1129            let query_metric_name = match plan {
1130                PromqlExpr::VectorSelector(vs) => vs
1131                    .matchers
1132                    .find_matchers(METRIC_NAME)
1133                    .into_iter()
1134                    .next()
1135                    .map(|m| m.value)
1136                    .or_else(|| vs.name.clone()),
1137                PromqlExpr::MatrixSelector(ms) => ms
1138                    .vs
1139                    .matchers
1140                    .find_matchers(METRIC_NAME)
1141                    .into_iter()
1142                    .next()
1143                    .map(|m| m.value)
1144                    .or_else(|| ms.vs.name.clone()),
1145                _ => return Ok(true),
1146            };
1147
1148            // set it to empty string if multiple metrics are referenced.
1149            if self.metric_name.is_some() && query_metric_name.is_some() {
1150                self.metric_name = Some(String::new());
1151            } else {
1152                self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
1153            }
1154
1155            Ok(true)
1156        }
1157    }
1158
1159    let mut visitor = MetricNameVisitor { metric_name: None };
1160    promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
1161    visitor.metric_name
1162}
1163
1164#[derive(Debug, Default, Serialize, Deserialize)]
1165pub struct SeriesQuery {
1166    start: Option<String>,
1167    end: Option<String>,
1168    lookback: Option<String>,
1169    #[serde(flatten)]
1170    matches: Matches,
1171    db: Option<String>,
1172}
1173
1174#[axum_macros::debug_handler]
1175#[tracing::instrument(
1176    skip_all,
1177    fields(protocol = "prometheus", request_type = "series_query")
1178)]
1179pub async fn series_query(
1180    State(handler): State<PrometheusHandlerRef>,
1181    Query(params): Query<SeriesQuery>,
1182    Extension(mut query_ctx): Extension<QueryContext>,
1183    Form(form_params): Form<SeriesQuery>,
1184) -> PrometheusJsonResponse {
1185    let mut queries: Vec<String> = params.matches.0;
1186    if queries.is_empty() {
1187        queries = form_params.matches.0;
1188    }
1189    if queries.is_empty() {
1190        return PrometheusJsonResponse::error(
1191            StatusCode::Unsupported,
1192            "match[] parameter is required",
1193        );
1194    }
1195    let start = params
1196        .start
1197        .or(form_params.start)
1198        .unwrap_or_else(yesterday_rfc3339);
1199    let end = params
1200        .end
1201        .or(form_params.end)
1202        .unwrap_or_else(current_time_rfc3339);
1203    let lookback = params
1204        .lookback
1205        .or(form_params.lookback)
1206        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1207
1208    // update catalog and schema in query context if necessary
1209    if let Some(db) = &params.db {
1210        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1211        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1212    }
1213    let query_ctx = Arc::new(query_ctx);
1214
1215    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1216        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1217        .start_timer();
1218
1219    let mut series = Vec::new();
1220    let mut merge_map = HashMap::new();
1221    for query in queries {
1222        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1223        let prom_query = PromQuery {
1224            query,
1225            start: start.clone(),
1226            end: end.clone(),
1227            // TODO: find a better value for step
1228            step: DEFAULT_LOOKBACK_STRING.to_string(),
1229            lookback: lookback.clone(),
1230        };
1231        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1232
1233        handle_schema_err!(
1234            retrieve_series_from_query_result(
1235                result,
1236                &mut series,
1237                &query_ctx,
1238                &table_name,
1239                &handler.catalog_manager(),
1240                &mut merge_map,
1241            )
1242            .await
1243        );
1244    }
1245    let merge_map = merge_map
1246        .into_iter()
1247        .map(|(k, v)| (k, Value::from(v)))
1248        .collect();
1249    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1250    resp.resp_metrics = merge_map;
1251    resp
1252}
1253
1254#[derive(Debug, Default, Serialize, Deserialize)]
1255pub struct ParseQuery {
1256    query: Option<String>,
1257    db: Option<String>,
1258}
1259
1260#[axum_macros::debug_handler]
1261#[tracing::instrument(
1262    skip_all,
1263    fields(protocol = "prometheus", request_type = "parse_query")
1264)]
1265pub async fn parse_query(
1266    State(_handler): State<PrometheusHandlerRef>,
1267    Query(params): Query<ParseQuery>,
1268    Extension(_query_ctx): Extension<QueryContext>,
1269    Form(form_params): Form<ParseQuery>,
1270) -> PrometheusJsonResponse {
1271    if let Some(query) = params.query.or(form_params.query) {
1272        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1273        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1274    } else {
1275        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1276    }
1277}