servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use axum::extract::{Path, Query, State};
20use axum::{Extension, Form};
21use catalog::CatalogManagerRef;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::ErrorExt;
24use common_error::status_code::StatusCode;
25use common_query::{Output, OutputData};
26use common_recordbatch::RecordBatches;
27use common_telemetry::{debug, tracing};
28use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
29use common_version::OwnedBuildInfo;
30use datatypes::prelude::ConcreteDataType;
31use datatypes::scalars::ScalarVector;
32use datatypes::vectors::Float64Vector;
33use futures::future::join_all;
34use futures::StreamExt;
35use itertools::Itertools;
36use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
37use promql_parser::parser::value::ValueType;
38use promql_parser::parser::{
39    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
40    UnaryExpr, VectorSelector,
41};
42use query::parser::{PromQuery, QueryLanguageParser, DEFAULT_LOOKBACK_STRING};
43use query::promql::planner::normalize_matcher;
44use serde::de::{self, MapAccess, Visitor};
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use session::context::{QueryContext, QueryContextRef};
48use snafu::{Location, OptionExt, ResultExt};
49use store_api::metric_engine_consts::{
50    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
51};
52
53pub use super::result::prometheus_resp::PrometheusJsonResponse;
54use crate::error::{
55    CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
56    TableNotFoundSnafu, UnexpectedResultSnafu,
57};
58use crate::http::header::collect_plan_metrics;
59use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
60use crate::prometheus_handler::PrometheusHandlerRef;
61
62/// For [ValueType::Vector] result type
63#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
64pub struct PromSeriesVector {
65    pub metric: HashMap<String, String>,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub value: Option<(f64, String)>,
68}
69
70/// For [ValueType::Matrix] result type
71#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
72pub struct PromSeriesMatrix {
73    pub metric: HashMap<String, String>,
74    pub values: Vec<(f64, String)>,
75}
76
77/// Variants corresponding to [ValueType]
78#[derive(Debug, Serialize, Deserialize, PartialEq)]
79#[serde(untagged)]
80pub enum PromQueryResult {
81    Matrix(Vec<PromSeriesMatrix>),
82    Vector(Vec<PromSeriesVector>),
83    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
84    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
85}
86
87impl Default for PromQueryResult {
88    fn default() -> Self {
89        PromQueryResult::Matrix(Default::default())
90    }
91}
92
93#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
94pub struct PromData {
95    #[serde(rename = "resultType")]
96    pub result_type: String,
97    pub result: PromQueryResult,
98}
99
100#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
101#[serde(untagged)]
102pub enum PrometheusResponse {
103    PromData(PromData),
104    Labels(Vec<String>),
105    Series(Vec<HashMap<String, String>>),
106    LabelValues(Vec<String>),
107    FormatQuery(String),
108    BuildInfo(OwnedBuildInfo),
109    #[serde(skip_deserializing)]
110    ParseResult(promql_parser::parser::Expr),
111    #[default]
112    None,
113}
114
115impl PrometheusResponse {
116    /// Append the other [`PrometheusResponse]`.
117    /// # NOTE
118    ///   Only append matrix and vector results, otherwise just ignore the other response.
119    fn append(&mut self, other: PrometheusResponse) {
120        match (self, other) {
121            (
122                PrometheusResponse::PromData(PromData {
123                    result: PromQueryResult::Matrix(lhs),
124                    ..
125                }),
126                PrometheusResponse::PromData(PromData {
127                    result: PromQueryResult::Matrix(rhs),
128                    ..
129                }),
130            ) => {
131                lhs.extend(rhs);
132            }
133
134            (
135                PrometheusResponse::PromData(PromData {
136                    result: PromQueryResult::Vector(lhs),
137                    ..
138                }),
139                PrometheusResponse::PromData(PromData {
140                    result: PromQueryResult::Vector(rhs),
141                    ..
142                }),
143            ) => {
144                lhs.extend(rhs);
145            }
146            _ => {
147                // TODO(dennis): process other cases?
148            }
149        }
150    }
151
152    pub fn is_none(&self) -> bool {
153        matches!(self, PrometheusResponse::None)
154    }
155}
156
157#[derive(Debug, Default, Serialize, Deserialize)]
158pub struct FormatQuery {
159    query: Option<String>,
160}
161
162#[axum_macros::debug_handler]
163#[tracing::instrument(
164    skip_all,
165    fields(protocol = "prometheus", request_type = "format_query")
166)]
167pub async fn format_query(
168    State(_handler): State<PrometheusHandlerRef>,
169    Query(params): Query<InstantQuery>,
170    Extension(_query_ctx): Extension<QueryContext>,
171    Form(form_params): Form<InstantQuery>,
172) -> PrometheusJsonResponse {
173    let query = params.query.or(form_params.query).unwrap_or_default();
174    match promql_parser::parser::parse(&query) {
175        Ok(expr) => {
176            let pretty = expr.prettify();
177            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
178        }
179        Err(reason) => {
180            let err = InvalidQuerySnafu { reason }.build();
181            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
182        }
183    }
184}
185
186#[derive(Debug, Default, Serialize, Deserialize)]
187pub struct BuildInfoQuery {}
188
189#[axum_macros::debug_handler]
190#[tracing::instrument(
191    skip_all,
192    fields(protocol = "prometheus", request_type = "build_info_query")
193)]
194pub async fn build_info_query() -> PrometheusJsonResponse {
195    let build_info = common_version::build_info().clone();
196    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
197}
198
199#[derive(Debug, Default, Serialize, Deserialize)]
200pub struct InstantQuery {
201    query: Option<String>,
202    lookback: Option<String>,
203    time: Option<String>,
204    timeout: Option<String>,
205    db: Option<String>,
206}
207
208/// Helper macro which try to evaluate the expression and return its results.
209/// If the evaluation fails, return a `PrometheusJsonResponse` early.
210macro_rules! try_call_return_response {
211    ($handle: expr) => {
212        match $handle {
213            Ok(res) => res,
214            Err(err) => {
215                let msg = err.to_string();
216                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
217            }
218        }
219    };
220}
221
222#[axum_macros::debug_handler]
223#[tracing::instrument(
224    skip_all,
225    fields(protocol = "prometheus", request_type = "instant_query")
226)]
227pub async fn instant_query(
228    State(handler): State<PrometheusHandlerRef>,
229    Query(params): Query<InstantQuery>,
230    Extension(mut query_ctx): Extension<QueryContext>,
231    Form(form_params): Form<InstantQuery>,
232) -> PrometheusJsonResponse {
233    // Extract time from query string, or use current server time if not specified.
234    let time = params
235        .time
236        .or(form_params.time)
237        .unwrap_or_else(current_time_rfc3339);
238    let prom_query = PromQuery {
239        query: params.query.or(form_params.query).unwrap_or_default(),
240        start: time.clone(),
241        end: time,
242        step: "1s".to_string(),
243        lookback: params
244            .lookback
245            .or(form_params.lookback)
246            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
247    };
248
249    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
250
251    // update catalog and schema in query context if necessary
252    if let Some(db) = &params.db {
253        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
254        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
255    }
256    let query_ctx = Arc::new(query_ctx);
257
258    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
259        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
260        .start_timer();
261
262    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
263        && !name_matchers.is_empty()
264    {
265        debug!("Find metric name matchers: {:?}", name_matchers);
266
267        let metric_names =
268            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
269
270        debug!("Find metric names: {:?}", metric_names);
271
272        if metric_names.is_empty() {
273            let result_type = promql_expr.value_type();
274
275            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
276                result_type: result_type.to_string(),
277                ..Default::default()
278            }));
279        }
280
281        let responses = join_all(metric_names.into_iter().map(|metric| {
282            let mut prom_query = prom_query.clone();
283            let mut promql_expr = promql_expr.clone();
284            let query_ctx = query_ctx.clone();
285            let handler = handler.clone();
286
287            async move {
288                update_metric_name_matcher(&mut promql_expr, &metric);
289                let new_query = promql_expr.to_string();
290                debug!(
291                    "Updated promql, before: {}, after: {}",
292                    &prom_query.query, new_query
293                );
294                prom_query.query = new_query;
295
296                do_instant_query(&handler, &prom_query, query_ctx).await
297            }
298        }))
299        .await;
300
301        responses
302            .into_iter()
303            .reduce(|mut acc, resp| {
304                acc.data.append(resp.data);
305                acc
306            })
307            .unwrap()
308    } else {
309        do_instant_query(&handler, &prom_query, query_ctx).await
310    }
311}
312
313/// Executes a single instant query and returns response
314async fn do_instant_query(
315    handler: &PrometheusHandlerRef,
316    prom_query: &PromQuery,
317    query_ctx: QueryContextRef,
318) -> PrometheusJsonResponse {
319    let result = handler.do_query(prom_query, query_ctx).await;
320    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
321        Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
322        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
323    };
324    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
325}
326
327#[derive(Debug, Default, Serialize, Deserialize)]
328pub struct RangeQuery {
329    query: Option<String>,
330    start: Option<String>,
331    end: Option<String>,
332    step: Option<String>,
333    lookback: Option<String>,
334    timeout: Option<String>,
335    db: Option<String>,
336}
337
338#[axum_macros::debug_handler]
339#[tracing::instrument(
340    skip_all,
341    fields(protocol = "prometheus", request_type = "range_query")
342)]
343pub async fn range_query(
344    State(handler): State<PrometheusHandlerRef>,
345    Query(params): Query<RangeQuery>,
346    Extension(mut query_ctx): Extension<QueryContext>,
347    Form(form_params): Form<RangeQuery>,
348) -> PrometheusJsonResponse {
349    let prom_query = PromQuery {
350        query: params.query.or(form_params.query).unwrap_or_default(),
351        start: params.start.or(form_params.start).unwrap_or_default(),
352        end: params.end.or(form_params.end).unwrap_or_default(),
353        step: params.step.or(form_params.step).unwrap_or_default(),
354        lookback: params
355            .lookback
356            .or(form_params.lookback)
357            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
358    };
359
360    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
361
362    // update catalog and schema in query context if necessary
363    if let Some(db) = &params.db {
364        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
365        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
366    }
367    let query_ctx = Arc::new(query_ctx);
368    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
369        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
370        .start_timer();
371
372    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
373        && !name_matchers.is_empty()
374    {
375        debug!("Find metric name matchers: {:?}", name_matchers);
376
377        let metric_names =
378            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
379
380        debug!("Find metric names: {:?}", metric_names);
381
382        if metric_names.is_empty() {
383            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
384                result_type: ValueType::Matrix.to_string(),
385                ..Default::default()
386            }));
387        }
388
389        let responses = join_all(metric_names.into_iter().map(|metric| {
390            let mut prom_query = prom_query.clone();
391            let mut promql_expr = promql_expr.clone();
392            let query_ctx = query_ctx.clone();
393            let handler = handler.clone();
394
395            async move {
396                update_metric_name_matcher(&mut promql_expr, &metric);
397                let new_query = promql_expr.to_string();
398                debug!(
399                    "Updated promql, before: {}, after: {}",
400                    &prom_query.query, new_query
401                );
402                prom_query.query = new_query;
403
404                do_range_query(&handler, &prom_query, query_ctx).await
405            }
406        }))
407        .await;
408
409        // Safety: at least one responses, checked above
410        responses
411            .into_iter()
412            .reduce(|mut acc, resp| {
413                acc.data.append(resp.data);
414                acc
415            })
416            .unwrap()
417    } else {
418        do_range_query(&handler, &prom_query, query_ctx).await
419    }
420}
421
422/// Executes a single range query and returns response
423async fn do_range_query(
424    handler: &PrometheusHandlerRef,
425    prom_query: &PromQuery,
426    query_ctx: QueryContextRef,
427) -> PrometheusJsonResponse {
428    let result = handler.do_query(prom_query, query_ctx).await;
429    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
430        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
431        Ok((metric_name, _)) => metric_name.unwrap_or_default(),
432    };
433    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
434}
435
436#[derive(Debug, Default, Serialize)]
437struct Matches(Vec<String>);
438
439#[derive(Debug, Default, Serialize, Deserialize)]
440pub struct LabelsQuery {
441    start: Option<String>,
442    end: Option<String>,
443    lookback: Option<String>,
444    #[serde(flatten)]
445    matches: Matches,
446    db: Option<String>,
447}
448
449// Custom Deserialize method to support parsing repeated match[]
450impl<'de> Deserialize<'de> for Matches {
451    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
452    where
453        D: de::Deserializer<'de>,
454    {
455        struct MatchesVisitor;
456
457        impl<'d> Visitor<'d> for MatchesVisitor {
458            type Value = Vec<String>;
459
460            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
461                formatter.write_str("a string")
462            }
463
464            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
465            where
466                M: MapAccess<'d>,
467            {
468                let mut matches = Vec::new();
469                while let Some((key, value)) = access.next_entry::<String, String>()? {
470                    if key == "match[]" {
471                        matches.push(value);
472                    }
473                }
474                Ok(matches)
475            }
476        }
477        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
478    }
479}
480
481#[axum_macros::debug_handler]
482#[tracing::instrument(
483    skip_all,
484    fields(protocol = "prometheus", request_type = "labels_query")
485)]
486pub async fn labels_query(
487    State(handler): State<PrometheusHandlerRef>,
488    Query(params): Query<LabelsQuery>,
489    Extension(mut query_ctx): Extension<QueryContext>,
490    Form(form_params): Form<LabelsQuery>,
491) -> PrometheusJsonResponse {
492    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
493    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
494    let query_ctx = Arc::new(query_ctx);
495
496    let mut queries = params.matches.0;
497    if queries.is_empty() {
498        queries = form_params.matches.0;
499    }
500
501    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
502        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
503        .start_timer();
504
505    // Fetch all tag columns. It will be used as white-list for tag names.
506    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
507    {
508        Ok(labels) => labels,
509        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
510    };
511    // insert the special metric name label
512    let _ = labels.insert(METRIC_NAME.to_string());
513
514    // Fetch all columns if no query matcher is provided
515    if queries.is_empty() {
516        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
517        labels_vec.sort_unstable();
518        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
519    }
520
521    // Otherwise, run queries and extract column name from result set.
522    let start = params
523        .start
524        .or(form_params.start)
525        .unwrap_or_else(yesterday_rfc3339);
526    let end = params
527        .end
528        .or(form_params.end)
529        .unwrap_or_else(current_time_rfc3339);
530    let lookback = params
531        .lookback
532        .or(form_params.lookback)
533        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
534
535    let mut fetched_labels = HashSet::new();
536    let _ = fetched_labels.insert(METRIC_NAME.to_string());
537
538    let mut merge_map = HashMap::new();
539    for query in queries {
540        let prom_query = PromQuery {
541            query,
542            start: start.clone(),
543            end: end.clone(),
544            step: DEFAULT_LOOKBACK_STRING.to_string(),
545            lookback: lookback.clone(),
546        };
547
548        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
549        if let Err(err) =
550            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
551                .await
552        {
553            // Prometheus won't report error if querying nonexist label and metric
554            if err.status_code() != StatusCode::TableNotFound
555                && err.status_code() != StatusCode::TableColumnNotFound
556            {
557                return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
558            }
559        }
560    }
561
562    // intersect `fetched_labels` with `labels` to filter out non-tag columns
563    fetched_labels.retain(|l| labels.contains(l));
564    let _ = labels.insert(METRIC_NAME.to_string());
565
566    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
567    sorted_labels.sort();
568    let merge_map = merge_map
569        .into_iter()
570        .map(|(k, v)| (k, Value::from(v)))
571        .collect();
572    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
573    resp.resp_metrics = merge_map;
574    resp
575}
576
577/// Get all tag column name of the given schema
578async fn get_all_column_names(
579    catalog: &str,
580    schema: &str,
581    manager: &CatalogManagerRef,
582) -> std::result::Result<HashSet<String>, catalog::error::Error> {
583    let table_names = manager.table_names(catalog, schema, None).await?;
584
585    let mut labels = HashSet::new();
586    for table_name in table_names {
587        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
588            continue;
589        };
590        for column in table.primary_key_columns() {
591            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
592                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
593            {
594                labels.insert(column.name);
595            }
596        }
597    }
598
599    Ok(labels)
600}
601
602async fn retrieve_series_from_query_result(
603    result: Result<Output>,
604    series: &mut Vec<HashMap<String, String>>,
605    query_ctx: &QueryContext,
606    table_name: &str,
607    manager: &CatalogManagerRef,
608    metrics: &mut HashMap<String, u64>,
609) -> Result<()> {
610    let result = result?;
611
612    // fetch tag list
613    let table = manager
614        .table(
615            query_ctx.current_catalog(),
616            &query_ctx.current_schema(),
617            table_name,
618            Some(query_ctx),
619        )
620        .await
621        .context(CatalogSnafu)?
622        .with_context(|| TableNotFoundSnafu {
623            catalog: query_ctx.current_catalog(),
624            schema: query_ctx.current_schema(),
625            table: table_name,
626        })?;
627    let tag_columns = table
628        .primary_key_columns()
629        .map(|c| c.name)
630        .collect::<HashSet<_>>();
631
632    match result.data {
633        OutputData::RecordBatches(batches) => {
634            record_batches_to_series(batches, series, table_name, &tag_columns)
635        }
636        OutputData::Stream(stream) => {
637            let batches = RecordBatches::try_collect(stream)
638                .await
639                .context(CollectRecordbatchSnafu)?;
640            record_batches_to_series(batches, series, table_name, &tag_columns)
641        }
642        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
643            reason: "expected data result, but got affected rows".to_string(),
644            location: Location::default(),
645        }),
646    }?;
647
648    if let Some(ref plan) = result.meta.plan {
649        collect_plan_metrics(plan, &mut [metrics]);
650    }
651    Ok(())
652}
653
654/// Retrieve labels name from query result
655async fn retrieve_labels_name_from_query_result(
656    result: Result<Output>,
657    labels: &mut HashSet<String>,
658    metrics: &mut HashMap<String, u64>,
659) -> Result<()> {
660    let result = result?;
661    match result.data {
662        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
663        OutputData::Stream(stream) => {
664            let batches = RecordBatches::try_collect(stream)
665                .await
666                .context(CollectRecordbatchSnafu)?;
667            record_batches_to_labels_name(batches, labels)
668        }
669        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
670            reason: "expected data result, but got affected rows".to_string(),
671        }
672        .fail(),
673    }?;
674    if let Some(ref plan) = result.meta.plan {
675        collect_plan_metrics(plan, &mut [metrics]);
676    }
677    Ok(())
678}
679
680fn record_batches_to_series(
681    batches: RecordBatches,
682    series: &mut Vec<HashMap<String, String>>,
683    table_name: &str,
684    tag_columns: &HashSet<String>,
685) -> Result<()> {
686    for batch in batches.iter() {
687        // project record batch to only contains tag columns
688        let projection = batch
689            .schema
690            .column_schemas()
691            .iter()
692            .enumerate()
693            .filter_map(|(idx, col)| {
694                if tag_columns.contains(&col.name) {
695                    Some(idx)
696                } else {
697                    None
698                }
699            })
700            .collect::<Vec<_>>();
701        let batch = batch
702            .try_project(&projection)
703            .context(CollectRecordbatchSnafu)?;
704
705        for row in batch.rows() {
706            let mut element: HashMap<String, String> = row
707                .iter()
708                .enumerate()
709                .map(|(idx, column)| {
710                    let column_name = batch.schema.column_name_by_index(idx);
711                    (column_name.to_string(), column.to_string())
712                })
713                .collect();
714            let _ = element.insert("__name__".to_string(), table_name.to_string());
715            series.push(element);
716        }
717    }
718    Ok(())
719}
720
721/// Retrieve labels name from record batches
722fn record_batches_to_labels_name(
723    batches: RecordBatches,
724    labels: &mut HashSet<String>,
725) -> Result<()> {
726    let mut column_indices = Vec::new();
727    let mut field_column_indices = Vec::new();
728    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
729        if let ConcreteDataType::Float64(_) = column.data_type {
730            field_column_indices.push(i);
731        }
732        column_indices.push(i);
733    }
734
735    if field_column_indices.is_empty() {
736        return Err(Error::Internal {
737            err_msg: "no value column found".to_string(),
738        });
739    }
740
741    for batch in batches.iter() {
742        let names = column_indices
743            .iter()
744            .map(|c| batches.schema().column_name_by_index(*c).to_string())
745            .collect::<Vec<_>>();
746
747        let field_columns = field_column_indices
748            .iter()
749            .map(|i| {
750                batch
751                    .column(*i)
752                    .as_any()
753                    .downcast_ref::<Float64Vector>()
754                    .unwrap()
755            })
756            .collect::<Vec<_>>();
757
758        for row_index in 0..batch.num_rows() {
759            // if all field columns are null, skip this row
760            if field_columns
761                .iter()
762                .all(|c| c.get_data(row_index).is_none())
763            {
764                continue;
765            }
766
767            // if a field is not null, record the tag name and return
768            names.iter().for_each(|name| {
769                let _ = labels.insert(name.to_string());
770            });
771            return Ok(());
772        }
773    }
774    Ok(())
775}
776
777pub(crate) fn retrieve_metric_name_and_result_type(
778    promql: &str,
779) -> Result<(Option<String>, ValueType)> {
780    let promql_expr = promql_parser::parser::parse(promql)
781        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
782    let metric_name = promql_expr_to_metric_name(&promql_expr);
783    let result_type = promql_expr.value_type();
784
785    Ok((metric_name, result_type))
786}
787
788/// Tries to get catalog and schema from an optional db param. And retrieves
789/// them from [QueryContext] if they don't present.
790pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
791    if let Some(db) = db {
792        parse_catalog_and_schema_from_db_string(db)
793    } else {
794        (
795            ctx.current_catalog().to_string(),
796            ctx.current_schema().to_string(),
797        )
798    }
799}
800
801/// Update catalog and schema in [QueryContext] if necessary.
802pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
803    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
804        ctx.set_current_catalog(catalog);
805        ctx.set_current_schema(schema);
806    }
807}
808
809fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
810    find_metric_name_and_matchers(expr, |name, matchers| {
811        name.clone().or(matchers
812            .find_matchers(METRIC_NAME)
813            .into_iter()
814            .next()
815            .map(|m| m.value))
816    })
817}
818
819fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
820where
821    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
822{
823    match expr {
824        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
825        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
826        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
827            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
828        }
829        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
830        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
831        PromqlExpr::NumberLiteral(_) => None,
832        PromqlExpr::StringLiteral(_) => None,
833        PromqlExpr::Extension(_) => None,
834        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
835        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
836            let VectorSelector { name, matchers, .. } = vs;
837
838            f(name, matchers)
839        }
840        PromqlExpr::Call(Call { args, .. }) => args
841            .args
842            .iter()
843            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
844    }
845}
846
847/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
848fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
849    find_metric_name_and_matchers(expr, |name, matchers| {
850        // Has name, ignore the matchers
851        if name.is_some() {
852            return None;
853        }
854
855        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
856        Some(matchers.find_matchers(METRIC_NAME))
857    })
858    .map(|matchers| {
859        matchers
860            .into_iter()
861            .filter(|m| !matches!(m.op, MatchOp::Equal))
862            .map(normalize_matcher)
863            .collect::<Vec<_>>()
864    })
865}
866
867/// Update the `__name__` matchers in expression into special value
868/// Returns the updated expression.
869fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
870    match expr {
871        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
872            update_metric_name_matcher(expr, metric_name)
873        }
874        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
875        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
876            update_metric_name_matcher(lhs, metric_name);
877            update_metric_name_matcher(rhs, metric_name);
878        }
879        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
880        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
881            update_metric_name_matcher(expr, metric_name)
882        }
883        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
884            if name.is_some() {
885                return;
886            }
887
888            for m in &mut matchers.matchers {
889                if m.name == METRIC_NAME {
890                    m.op = MatchOp::Equal;
891                    m.value = metric_name.to_string();
892                }
893            }
894        }
895        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
896            let VectorSelector { name, matchers, .. } = vs;
897            if name.is_some() {
898                return;
899            }
900
901            for m in &mut matchers.matchers {
902                if m.name == METRIC_NAME {
903                    m.op = MatchOp::Equal;
904                    m.value = metric_name.to_string();
905                }
906            }
907        }
908        PromqlExpr::Call(Call { args, .. }) => {
909            args.args.iter_mut().for_each(|e| {
910                update_metric_name_matcher(e, metric_name);
911            });
912        }
913        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
914    }
915}
916
917#[derive(Debug, Default, Serialize, Deserialize)]
918pub struct LabelValueQuery {
919    start: Option<String>,
920    end: Option<String>,
921    lookback: Option<String>,
922    #[serde(flatten)]
923    matches: Matches,
924    db: Option<String>,
925}
926
927#[axum_macros::debug_handler]
928#[tracing::instrument(
929    skip_all,
930    fields(protocol = "prometheus", request_type = "label_values_query")
931)]
932pub async fn label_values_query(
933    State(handler): State<PrometheusHandlerRef>,
934    Path(label_name): Path<String>,
935    Extension(mut query_ctx): Extension<QueryContext>,
936    Query(params): Query<LabelValueQuery>,
937) -> PrometheusJsonResponse {
938    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
939    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
940    let query_ctx = Arc::new(query_ctx);
941
942    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
943        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
944        .start_timer();
945
946    if label_name == METRIC_NAME_LABEL {
947        let catalog_manager = handler.catalog_manager();
948        let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx));
949        let mut table_names = Vec::new();
950        while let Some(table) = tables_stream.next().await {
951            // filter out physical tables
952            match table {
953                Ok(table) => {
954                    if table
955                        .table_info()
956                        .meta
957                        .options
958                        .extra_options
959                        .contains_key(PHYSICAL_TABLE_METADATA_KEY)
960                    {
961                        continue;
962                    }
963
964                    table_names.push(table.table_info().name.clone());
965                }
966                Err(e) => {
967                    return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
968                }
969            }
970        }
971        table_names.sort_unstable();
972        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
973    } else if label_name == FIELD_NAME_LABEL {
974        let field_columns =
975            match retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0)
976                .await
977            {
978                Ok(table_names) => table_names,
979                Err(e) => {
980                    return PrometheusJsonResponse::error(e.status_code(), e.output_msg());
981                }
982            };
983        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
984        field_columns.sort_unstable();
985        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
986    }
987
988    let queries = params.matches.0;
989    if queries.is_empty() {
990        return PrometheusJsonResponse::error(
991            StatusCode::InvalidArguments,
992            "match[] parameter is required",
993        );
994    }
995
996    let start = params.start.unwrap_or_else(yesterday_rfc3339);
997    let end = params.end.unwrap_or_else(current_time_rfc3339);
998    let mut label_values = HashSet::new();
999
1000    let start = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&start)
1001        .context(ParseTimestampSnafu { timestamp: &start }));
1002    let end = try_call_return_response!(QueryLanguageParser::parse_promql_timestamp(&end)
1003        .context(ParseTimestampSnafu { timestamp: &end }));
1004
1005    for query in queries {
1006        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1007        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1008            return PrometheusJsonResponse::error(
1009                StatusCode::InvalidArguments,
1010                "expected vector selector",
1011            );
1012        };
1013        let Some(name) = take_metric_name(&mut vector_selector) else {
1014            return PrometheusJsonResponse::error(
1015                StatusCode::InvalidArguments,
1016                "expected metric name",
1017            );
1018        };
1019        let VectorSelector { matchers, .. } = vector_selector;
1020        // Only use and filter matchers.
1021        let matchers = matchers.matchers;
1022        let result = handler
1023            .query_label_values(
1024                name,
1025                label_name.to_string(),
1026                matchers,
1027                start,
1028                end,
1029                &query_ctx,
1030            )
1031            .await;
1032
1033        match result {
1034            Ok(result) => {
1035                label_values.extend(result.into_iter());
1036            }
1037            Err(err) => {
1038                // Prometheus won't report error if querying nonexist label and metric
1039                if err.status_code() != StatusCode::TableNotFound
1040                    && err.status_code() != StatusCode::TableColumnNotFound
1041                {
1042                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
1043                }
1044            }
1045        }
1046    }
1047
1048    let mut label_values: Vec<_> = label_values.into_iter().collect();
1049    label_values.sort_unstable();
1050    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1051}
1052
1053/// Take metric name from the [VectorSelector].
1054/// It takes the name in the selector or removes the name matcher.
1055fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1056    if let Some(name) = selector.name.take() {
1057        return Some(name);
1058    }
1059
1060    let (pos, matcher) = selector
1061        .matchers
1062        .matchers
1063        .iter()
1064        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1065    let name = matcher.value.clone();
1066    // We need to remove the name matcher to avoid using it as a filter in query.
1067    selector.matchers.matchers.remove(pos);
1068
1069    Some(name)
1070}
1071
1072async fn retrieve_field_names(
1073    query_ctx: &QueryContext,
1074    manager: CatalogManagerRef,
1075    matches: Vec<String>,
1076) -> Result<HashSet<String>> {
1077    let mut field_columns = HashSet::new();
1078    let catalog = query_ctx.current_catalog();
1079    let schema = query_ctx.current_schema();
1080
1081    if matches.is_empty() {
1082        // query all tables if no matcher is provided
1083        while let Some(table) = manager
1084            .tables(catalog, &schema, Some(query_ctx))
1085            .next()
1086            .await
1087        {
1088            let table = table.context(CatalogSnafu)?;
1089            for column in table.field_columns() {
1090                field_columns.insert(column.name);
1091            }
1092        }
1093        return Ok(field_columns);
1094    }
1095
1096    for table_name in matches {
1097        let table = manager
1098            .table(catalog, &schema, &table_name, Some(query_ctx))
1099            .await
1100            .context(CatalogSnafu)?
1101            .with_context(|| TableNotFoundSnafu {
1102                catalog: catalog.to_string(),
1103                schema: schema.to_string(),
1104                table: table_name.to_string(),
1105            })?;
1106
1107        for column in table.field_columns() {
1108            field_columns.insert(column.name);
1109        }
1110    }
1111    Ok(field_columns)
1112}
1113
1114/// Try to parse and extract the name of referenced metric from the promql query.
1115///
1116/// Returns the metric name if a single metric is referenced, otherwise None.
1117fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1118    let promql_expr = promql_parser::parser::parse(query).ok()?;
1119
1120    struct MetricNameVisitor {
1121        metric_name: Option<String>,
1122    }
1123
1124    impl promql_parser::util::ExprVisitor for MetricNameVisitor {
1125        type Error = ();
1126
1127        fn pre_visit(&mut self, plan: &PromqlExpr) -> std::result::Result<bool, Self::Error> {
1128            let query_metric_name = match plan {
1129                PromqlExpr::VectorSelector(vs) => vs
1130                    .matchers
1131                    .find_matchers(METRIC_NAME)
1132                    .into_iter()
1133                    .next()
1134                    .map(|m| m.value)
1135                    .or_else(|| vs.name.clone()),
1136                PromqlExpr::MatrixSelector(ms) => ms
1137                    .vs
1138                    .matchers
1139                    .find_matchers(METRIC_NAME)
1140                    .into_iter()
1141                    .next()
1142                    .map(|m| m.value)
1143                    .or_else(|| ms.vs.name.clone()),
1144                _ => return Ok(true),
1145            };
1146
1147            // set it to empty string if multiple metrics are referenced.
1148            if self.metric_name.is_some() && query_metric_name.is_some() {
1149                self.metric_name = Some(String::new());
1150            } else {
1151                self.metric_name = query_metric_name.or_else(|| self.metric_name.clone());
1152            }
1153
1154            Ok(true)
1155        }
1156    }
1157
1158    let mut visitor = MetricNameVisitor { metric_name: None };
1159    promql_parser::util::walk_expr(&mut visitor, &promql_expr).ok()?;
1160    visitor.metric_name
1161}
1162
1163#[derive(Debug, Default, Serialize, Deserialize)]
1164pub struct SeriesQuery {
1165    start: Option<String>,
1166    end: Option<String>,
1167    lookback: Option<String>,
1168    #[serde(flatten)]
1169    matches: Matches,
1170    db: Option<String>,
1171}
1172
1173#[axum_macros::debug_handler]
1174#[tracing::instrument(
1175    skip_all,
1176    fields(protocol = "prometheus", request_type = "series_query")
1177)]
1178pub async fn series_query(
1179    State(handler): State<PrometheusHandlerRef>,
1180    Query(params): Query<SeriesQuery>,
1181    Extension(mut query_ctx): Extension<QueryContext>,
1182    Form(form_params): Form<SeriesQuery>,
1183) -> PrometheusJsonResponse {
1184    let mut queries: Vec<String> = params.matches.0;
1185    if queries.is_empty() {
1186        queries = form_params.matches.0;
1187    }
1188    if queries.is_empty() {
1189        return PrometheusJsonResponse::error(
1190            StatusCode::Unsupported,
1191            "match[] parameter is required",
1192        );
1193    }
1194    let start = params
1195        .start
1196        .or(form_params.start)
1197        .unwrap_or_else(yesterday_rfc3339);
1198    let end = params
1199        .end
1200        .or(form_params.end)
1201        .unwrap_or_else(current_time_rfc3339);
1202    let lookback = params
1203        .lookback
1204        .or(form_params.lookback)
1205        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1206
1207    // update catalog and schema in query context if necessary
1208    if let Some(db) = &params.db {
1209        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1210        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1211    }
1212    let query_ctx = Arc::new(query_ctx);
1213
1214    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1215        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1216        .start_timer();
1217
1218    let mut series = Vec::new();
1219    let mut merge_map = HashMap::new();
1220    for query in queries {
1221        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1222        let prom_query = PromQuery {
1223            query,
1224            start: start.clone(),
1225            end: end.clone(),
1226            // TODO: find a better value for step
1227            step: DEFAULT_LOOKBACK_STRING.to_string(),
1228            lookback: lookback.clone(),
1229        };
1230        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1231
1232        if let Err(err) = retrieve_series_from_query_result(
1233            result,
1234            &mut series,
1235            &query_ctx,
1236            &table_name,
1237            &handler.catalog_manager(),
1238            &mut merge_map,
1239        )
1240        .await
1241        {
1242            return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
1243        }
1244    }
1245    let merge_map = merge_map
1246        .into_iter()
1247        .map(|(k, v)| (k, Value::from(v)))
1248        .collect();
1249    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1250    resp.resp_metrics = merge_map;
1251    resp
1252}
1253
1254#[derive(Debug, Default, Serialize, Deserialize)]
1255pub struct ParseQuery {
1256    query: Option<String>,
1257    db: Option<String>,
1258}
1259
1260#[axum_macros::debug_handler]
1261#[tracing::instrument(
1262    skip_all,
1263    fields(protocol = "prometheus", request_type = "parse_query")
1264)]
1265pub async fn parse_query(
1266    State(_handler): State<PrometheusHandlerRef>,
1267    Query(params): Query<ParseQuery>,
1268    Extension(_query_ctx): Extension<QueryContext>,
1269    Form(form_params): Form<ParseQuery>,
1270) -> PrometheusJsonResponse {
1271    if let Some(query) = params.query.or(form_params.query) {
1272        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1273        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1274    } else {
1275        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1276    }
1277}