servers/http/
prometheus.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! prom supply the prometheus HTTP API Server compliance
16
17use std::borrow::Borrow;
18use std::collections::{BTreeMap, HashMap, HashSet};
19use std::hash::{Hash, Hasher};
20use std::sync::Arc;
21
22use arrow::array::AsArray;
23use arrow::datatypes::{
24    Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
25    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
26    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
27    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
28    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
29    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
30};
31use arrow_schema::{DataType, IntervalUnit, TimeUnit};
32use axum::extract::{Path, Query, State};
33use axum::{Extension, Form};
34use catalog::CatalogManagerRef;
35use common_catalog::parse_catalog_and_schema_from_db_string;
36use common_decimal::Decimal128;
37use common_error::ext::ErrorExt;
38use common_error::status_code::StatusCode;
39use common_query::{Output, OutputData};
40use common_recordbatch::{RecordBatch, RecordBatches};
41use common_telemetry::{debug, tracing};
42use common_time::time::Time;
43use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
44use common_time::{
45    Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
46};
47use common_version::OwnedBuildInfo;
48use datafusion_common::ScalarValue;
49use datatypes::prelude::ConcreteDataType;
50use datatypes::scalars::ScalarVector;
51use datatypes::schema::{ColumnSchema, SchemaRef};
52use datatypes::types::jsonb_to_string;
53use datatypes::vectors::Float64Vector;
54use futures::StreamExt;
55use futures::future::join_all;
56use itertools::Itertools;
57use promql_parser::label::{METRIC_NAME, MatchOp, Matcher, Matchers};
58use promql_parser::parser::token::{self};
59use promql_parser::parser::value::ValueType;
60use promql_parser::parser::{
61    AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, LabelModifier, MatrixSelector, ParenExpr,
62    SubqueryExpr, UnaryExpr, VectorSelector,
63};
64use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser};
65use query::promql::planner::normalize_matcher;
66use serde::de::{self, MapAccess, Visitor};
67use serde::{Deserialize, Serialize};
68use serde_json::Value;
69use session::context::{QueryContext, QueryContextRef};
70use snafu::{Location, OptionExt, ResultExt};
71use store_api::metric_engine_consts::{
72    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
73};
74
75pub use super::result::prometheus_resp::PrometheusJsonResponse;
76use crate::error::{
77    CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
78    InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
79    UnexpectedResultSnafu,
80};
81use crate::http::header::collect_plan_metrics;
82use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
83use crate::prometheus_handler::PrometheusHandlerRef;
84
85/// For [ValueType::Vector] result type
86#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
87pub struct PromSeriesVector {
88    pub metric: BTreeMap<String, String>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub value: Option<(f64, String)>,
91}
92
93/// For [ValueType::Matrix] result type
94#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
95pub struct PromSeriesMatrix {
96    pub metric: BTreeMap<String, String>,
97    pub values: Vec<(f64, String)>,
98}
99
100/// Variants corresponding to [ValueType]
101#[derive(Debug, Serialize, Deserialize, PartialEq)]
102#[serde(untagged)]
103pub enum PromQueryResult {
104    Matrix(Vec<PromSeriesMatrix>),
105    Vector(Vec<PromSeriesVector>),
106    Scalar(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
107    String(#[serde(skip_serializing_if = "Option::is_none")] Option<(f64, String)>),
108}
109
110impl Default for PromQueryResult {
111    fn default() -> Self {
112        PromQueryResult::Matrix(Default::default())
113    }
114}
115
116#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
117pub struct PromData {
118    #[serde(rename = "resultType")]
119    pub result_type: String,
120    pub result: PromQueryResult,
121}
122
123/// A "holder" for the reference([Arc]) to a column name,
124/// to help avoiding cloning [String]s when used as a [HashMap] key.
125#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
126pub struct Column(Arc<String>);
127
128impl From<&str> for Column {
129    fn from(s: &str) -> Self {
130        Self(Arc::new(s.to_string()))
131    }
132}
133
134#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
135#[serde(untagged)]
136pub enum PrometheusResponse {
137    PromData(PromData),
138    Labels(Vec<String>),
139    Series(Vec<HashMap<Column, String>>),
140    LabelValues(Vec<String>),
141    FormatQuery(String),
142    BuildInfo(OwnedBuildInfo),
143    #[serde(skip_deserializing)]
144    ParseResult(promql_parser::parser::Expr),
145    #[default]
146    None,
147}
148
149impl PrometheusResponse {
150    /// Append the other [`PrometheusResponse]`.
151    /// # NOTE
152    ///   Only append matrix and vector results, otherwise just ignore the other response.
153    fn append(&mut self, other: PrometheusResponse) {
154        match (self, other) {
155            (
156                PrometheusResponse::PromData(PromData {
157                    result: PromQueryResult::Matrix(lhs),
158                    ..
159                }),
160                PrometheusResponse::PromData(PromData {
161                    result: PromQueryResult::Matrix(rhs),
162                    ..
163                }),
164            ) => {
165                lhs.extend(rhs);
166            }
167
168            (
169                PrometheusResponse::PromData(PromData {
170                    result: PromQueryResult::Vector(lhs),
171                    ..
172                }),
173                PrometheusResponse::PromData(PromData {
174                    result: PromQueryResult::Vector(rhs),
175                    ..
176                }),
177            ) => {
178                lhs.extend(rhs);
179            }
180            _ => {
181                // TODO(dennis): process other cases?
182            }
183        }
184    }
185
186    pub fn is_none(&self) -> bool {
187        matches!(self, PrometheusResponse::None)
188    }
189}
190
191#[derive(Debug, Default, Serialize, Deserialize)]
192pub struct FormatQuery {
193    query: Option<String>,
194}
195
196#[axum_macros::debug_handler]
197#[tracing::instrument(
198    skip_all,
199    fields(protocol = "prometheus", request_type = "format_query")
200)]
201pub async fn format_query(
202    State(_handler): State<PrometheusHandlerRef>,
203    Query(params): Query<InstantQuery>,
204    Extension(_query_ctx): Extension<QueryContext>,
205    Form(form_params): Form<InstantQuery>,
206) -> PrometheusJsonResponse {
207    let query = params.query.or(form_params.query).unwrap_or_default();
208    match promql_parser::parser::parse(&query) {
209        Ok(expr) => {
210            let pretty = expr.prettify();
211            PrometheusJsonResponse::success(PrometheusResponse::FormatQuery(pretty))
212        }
213        Err(reason) => {
214            let err = InvalidQuerySnafu { reason }.build();
215            PrometheusJsonResponse::error(err.status_code(), err.output_msg())
216        }
217    }
218}
219
220#[derive(Debug, Default, Serialize, Deserialize)]
221pub struct BuildInfoQuery {}
222
223#[axum_macros::debug_handler]
224#[tracing::instrument(
225    skip_all,
226    fields(protocol = "prometheus", request_type = "build_info_query")
227)]
228pub async fn build_info_query() -> PrometheusJsonResponse {
229    let build_info = common_version::build_info().clone();
230    PrometheusJsonResponse::success(PrometheusResponse::BuildInfo(build_info.into()))
231}
232
233#[derive(Debug, Default, Serialize, Deserialize)]
234pub struct InstantQuery {
235    query: Option<String>,
236    lookback: Option<String>,
237    time: Option<String>,
238    timeout: Option<String>,
239    db: Option<String>,
240}
241
242/// Helper macro which try to evaluate the expression and return its results.
243/// If the evaluation fails, return a `PrometheusJsonResponse` early.
244macro_rules! try_call_return_response {
245    ($handle: expr) => {
246        match $handle {
247            Ok(res) => res,
248            Err(err) => {
249                let msg = err.to_string();
250                return PrometheusJsonResponse::error(StatusCode::InvalidArguments, msg);
251            }
252        }
253    };
254}
255
256#[axum_macros::debug_handler]
257#[tracing::instrument(
258    skip_all,
259    fields(protocol = "prometheus", request_type = "instant_query")
260)]
261pub async fn instant_query(
262    State(handler): State<PrometheusHandlerRef>,
263    Query(params): Query<InstantQuery>,
264    Extension(mut query_ctx): Extension<QueryContext>,
265    Form(form_params): Form<InstantQuery>,
266) -> PrometheusJsonResponse {
267    // Extract time from query string, or use current server time if not specified.
268    let time = params
269        .time
270        .or(form_params.time)
271        .unwrap_or_else(current_time_rfc3339);
272    let prom_query = PromQuery {
273        query: params.query.or(form_params.query).unwrap_or_default(),
274        start: time.clone(),
275        end: time,
276        step: "1s".to_string(),
277        lookback: params
278            .lookback
279            .or(form_params.lookback)
280            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
281        alias: None,
282    };
283
284    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
285
286    // update catalog and schema in query context if necessary
287    if let Some(db) = &params.db {
288        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
289        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
290    }
291    let query_ctx = Arc::new(query_ctx);
292
293    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
294        .with_label_values(&[query_ctx.get_db_string().as_str(), "instant_query"])
295        .start_timer();
296
297    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
298        && !name_matchers.is_empty()
299    {
300        debug!("Find metric name matchers: {:?}", name_matchers);
301
302        let metric_names =
303            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
304
305        debug!("Find metric names: {:?}", metric_names);
306
307        if metric_names.is_empty() {
308            let result_type = promql_expr.value_type();
309
310            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
311                result_type: result_type.to_string(),
312                ..Default::default()
313            }));
314        }
315
316        let responses = join_all(metric_names.into_iter().map(|metric| {
317            let mut prom_query = prom_query.clone();
318            let mut promql_expr = promql_expr.clone();
319            let query_ctx = query_ctx.clone();
320            let handler = handler.clone();
321
322            async move {
323                update_metric_name_matcher(&mut promql_expr, &metric);
324                let new_query = promql_expr.to_string();
325                debug!(
326                    "Updated promql, before: {}, after: {}",
327                    &prom_query.query, new_query
328                );
329                prom_query.query = new_query;
330
331                do_instant_query(&handler, &prom_query, query_ctx).await
332            }
333        }))
334        .await;
335
336        responses
337            .into_iter()
338            .reduce(|mut acc, resp| {
339                acc.data.append(resp.data);
340                acc
341            })
342            .unwrap()
343    } else {
344        do_instant_query(&handler, &prom_query, query_ctx).await
345    }
346}
347
348/// Executes a single instant query and returns response
349async fn do_instant_query(
350    handler: &PrometheusHandlerRef,
351    prom_query: &PromQuery,
352    query_ctx: QueryContextRef,
353) -> PrometheusJsonResponse {
354    let result = handler.do_query(prom_query, query_ctx).await;
355    let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
356        Ok((metric_name, result_type)) => (metric_name, result_type),
357        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
358    };
359    PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
360}
361
362#[derive(Debug, Default, Serialize, Deserialize)]
363pub struct RangeQuery {
364    query: Option<String>,
365    start: Option<String>,
366    end: Option<String>,
367    step: Option<String>,
368    lookback: Option<String>,
369    timeout: Option<String>,
370    db: Option<String>,
371}
372
373#[axum_macros::debug_handler]
374#[tracing::instrument(
375    skip_all,
376    fields(protocol = "prometheus", request_type = "range_query")
377)]
378pub async fn range_query(
379    State(handler): State<PrometheusHandlerRef>,
380    Query(params): Query<RangeQuery>,
381    Extension(mut query_ctx): Extension<QueryContext>,
382    Form(form_params): Form<RangeQuery>,
383) -> PrometheusJsonResponse {
384    let prom_query = PromQuery {
385        query: params.query.or(form_params.query).unwrap_or_default(),
386        start: params.start.or(form_params.start).unwrap_or_default(),
387        end: params.end.or(form_params.end).unwrap_or_default(),
388        step: params.step.or(form_params.step).unwrap_or_default(),
389        lookback: params
390            .lookback
391            .or(form_params.lookback)
392            .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
393        alias: None,
394    };
395
396    let promql_expr = try_call_return_response!(promql_parser::parser::parse(&prom_query.query));
397
398    // update catalog and schema in query context if necessary
399    if let Some(db) = &params.db {
400        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
401        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
402    }
403    let query_ctx = Arc::new(query_ctx);
404    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
405        .with_label_values(&[query_ctx.get_db_string().as_str(), "range_query"])
406        .start_timer();
407
408    if let Some(name_matchers) = find_metric_name_not_equal_matchers(&promql_expr)
409        && !name_matchers.is_empty()
410    {
411        debug!("Find metric name matchers: {:?}", name_matchers);
412
413        let metric_names =
414            try_call_return_response!(handler.query_metric_names(name_matchers, &query_ctx).await);
415
416        debug!("Find metric names: {:?}", metric_names);
417
418        if metric_names.is_empty() {
419            return PrometheusJsonResponse::success(PrometheusResponse::PromData(PromData {
420                result_type: ValueType::Matrix.to_string(),
421                ..Default::default()
422            }));
423        }
424
425        let responses = join_all(metric_names.into_iter().map(|metric| {
426            let mut prom_query = prom_query.clone();
427            let mut promql_expr = promql_expr.clone();
428            let query_ctx = query_ctx.clone();
429            let handler = handler.clone();
430
431            async move {
432                update_metric_name_matcher(&mut promql_expr, &metric);
433                let new_query = promql_expr.to_string();
434                debug!(
435                    "Updated promql, before: {}, after: {}",
436                    &prom_query.query, new_query
437                );
438                prom_query.query = new_query;
439
440                do_range_query(&handler, &prom_query, query_ctx).await
441            }
442        }))
443        .await;
444
445        // Safety: at least one responses, checked above
446        responses
447            .into_iter()
448            .reduce(|mut acc, resp| {
449                acc.data.append(resp.data);
450                acc
451            })
452            .unwrap()
453    } else {
454        do_range_query(&handler, &prom_query, query_ctx).await
455    }
456}
457
458/// Executes a single range query and returns response
459async fn do_range_query(
460    handler: &PrometheusHandlerRef,
461    prom_query: &PromQuery,
462    query_ctx: QueryContextRef,
463) -> PrometheusJsonResponse {
464    let result = handler.do_query(prom_query, query_ctx).await;
465    let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
466        Err(err) => return PrometheusJsonResponse::error(err.status_code(), err.output_msg()),
467        Ok((metric_name, _)) => metric_name,
468    };
469    PrometheusJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
470}
471
472#[derive(Debug, Default, Serialize)]
473struct Matches(Vec<String>);
474
475#[derive(Debug, Default, Serialize, Deserialize)]
476pub struct LabelsQuery {
477    start: Option<String>,
478    end: Option<String>,
479    lookback: Option<String>,
480    #[serde(flatten)]
481    matches: Matches,
482    db: Option<String>,
483}
484
485// Custom Deserialize method to support parsing repeated match[]
486impl<'de> Deserialize<'de> for Matches {
487    fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
488    where
489        D: de::Deserializer<'de>,
490    {
491        struct MatchesVisitor;
492
493        impl<'d> Visitor<'d> for MatchesVisitor {
494            type Value = Vec<String>;
495
496            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
497                formatter.write_str("a string")
498            }
499
500            fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
501            where
502                M: MapAccess<'d>,
503            {
504                let mut matches = Vec::new();
505                while let Some((key, value)) = access.next_entry::<String, String>()? {
506                    if key == "match[]" {
507                        matches.push(value);
508                    }
509                }
510                Ok(matches)
511            }
512        }
513        Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
514    }
515}
516
517/// Handles schema errors, transforming a Result into an Option.
518/// - If the input is `Ok(v)`, returns `Some(v)`
519/// - If the input is `Err(err)` and the error status code is `TableNotFound` or
520///   `TableColumnNotFound`, returns `None` (ignoring these specific errors)
521/// - If the input is `Err(err)` with any other error code, directly returns a
522///   `PrometheusJsonResponse::error`.
523macro_rules! handle_schema_err {
524    ($result:expr) => {
525        match $result {
526            Ok(v) => Some(v),
527            Err(err) => {
528                if err.status_code() == StatusCode::TableNotFound
529                    || err.status_code() == StatusCode::TableColumnNotFound
530                {
531                    // Prometheus won't report error if querying nonexist label and metric
532                    None
533                } else {
534                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
535                }
536            }
537        }
538    };
539}
540
541#[axum_macros::debug_handler]
542#[tracing::instrument(
543    skip_all,
544    fields(protocol = "prometheus", request_type = "labels_query")
545)]
546pub async fn labels_query(
547    State(handler): State<PrometheusHandlerRef>,
548    Query(params): Query<LabelsQuery>,
549    Extension(mut query_ctx): Extension<QueryContext>,
550    Form(form_params): Form<LabelsQuery>,
551) -> PrometheusJsonResponse {
552    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
553    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
554    let query_ctx = Arc::new(query_ctx);
555
556    let mut queries = params.matches.0;
557    if queries.is_empty() {
558        queries = form_params.matches.0;
559    }
560
561    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
562        .with_label_values(&[query_ctx.get_db_string().as_str(), "labels_query"])
563        .start_timer();
564
565    // Fetch all tag columns. It will be used as white-list for tag names.
566    let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
567    {
568        Ok(labels) => labels,
569        Err(e) => return PrometheusJsonResponse::error(e.status_code(), e.output_msg()),
570    };
571    // insert the special metric name label
572    let _ = labels.insert(METRIC_NAME.to_string());
573
574    // Fetch all columns if no query matcher is provided
575    if queries.is_empty() {
576        let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
577        labels_vec.sort_unstable();
578        return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
579    }
580
581    // Otherwise, run queries and extract column name from result set.
582    let start = params
583        .start
584        .or(form_params.start)
585        .unwrap_or_else(yesterday_rfc3339);
586    let end = params
587        .end
588        .or(form_params.end)
589        .unwrap_or_else(current_time_rfc3339);
590    let lookback = params
591        .lookback
592        .or(form_params.lookback)
593        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
594
595    let mut fetched_labels = HashSet::new();
596    let _ = fetched_labels.insert(METRIC_NAME.to_string());
597
598    let mut merge_map = HashMap::new();
599    for query in queries {
600        let prom_query = PromQuery {
601            query,
602            start: start.clone(),
603            end: end.clone(),
604            step: DEFAULT_LOOKBACK_STRING.to_string(),
605            lookback: lookback.clone(),
606            alias: None,
607        };
608
609        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
610        handle_schema_err!(
611            retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
612                .await
613        );
614    }
615
616    // intersect `fetched_labels` with `labels` to filter out non-tag columns
617    fetched_labels.retain(|l| labels.contains(l));
618    let _ = labels.insert(METRIC_NAME.to_string());
619
620    let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
621    sorted_labels.sort();
622    let merge_map = merge_map
623        .into_iter()
624        .map(|(k, v)| (k, Value::from(v)))
625        .collect();
626    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
627    resp.resp_metrics = merge_map;
628    resp
629}
630
631/// Get all tag column name of the given schema
632async fn get_all_column_names(
633    catalog: &str,
634    schema: &str,
635    manager: &CatalogManagerRef,
636) -> std::result::Result<HashSet<String>, catalog::error::Error> {
637    let table_names = manager.table_names(catalog, schema, None).await?;
638
639    let mut labels = HashSet::new();
640    for table_name in table_names {
641        let Some(table) = manager.table(catalog, schema, &table_name, None).await? else {
642            continue;
643        };
644        for column in table.primary_key_columns() {
645            if column.name != DATA_SCHEMA_TABLE_ID_COLUMN_NAME
646                && column.name != DATA_SCHEMA_TSID_COLUMN_NAME
647            {
648                labels.insert(column.name);
649            }
650        }
651    }
652
653    Ok(labels)
654}
655
656async fn retrieve_series_from_query_result(
657    result: Result<Output>,
658    series: &mut Vec<HashMap<Column, String>>,
659    query_ctx: &QueryContext,
660    table_name: &str,
661    manager: &CatalogManagerRef,
662    metrics: &mut HashMap<String, u64>,
663) -> Result<()> {
664    let result = result?;
665
666    // fetch tag list
667    let table = manager
668        .table(
669            query_ctx.current_catalog(),
670            &query_ctx.current_schema(),
671            table_name,
672            Some(query_ctx),
673        )
674        .await
675        .context(CatalogSnafu)?
676        .with_context(|| TableNotFoundSnafu {
677            catalog: query_ctx.current_catalog(),
678            schema: query_ctx.current_schema(),
679            table: table_name,
680        })?;
681    let tag_columns = table
682        .primary_key_columns()
683        .map(|c| c.name)
684        .collect::<HashSet<_>>();
685
686    match result.data {
687        OutputData::RecordBatches(batches) => {
688            record_batches_to_series(batches, series, table_name, &tag_columns)
689        }
690        OutputData::Stream(stream) => {
691            let batches = RecordBatches::try_collect(stream)
692                .await
693                .context(CollectRecordbatchSnafu)?;
694            record_batches_to_series(batches, series, table_name, &tag_columns)
695        }
696        OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
697            reason: "expected data result, but got affected rows".to_string(),
698            location: Location::default(),
699        }),
700    }?;
701
702    if let Some(ref plan) = result.meta.plan {
703        collect_plan_metrics(plan, &mut [metrics]);
704    }
705    Ok(())
706}
707
708/// Retrieve labels name from query result
709async fn retrieve_labels_name_from_query_result(
710    result: Result<Output>,
711    labels: &mut HashSet<String>,
712    metrics: &mut HashMap<String, u64>,
713) -> Result<()> {
714    let result = result?;
715    match result.data {
716        OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
717        OutputData::Stream(stream) => {
718            let batches = RecordBatches::try_collect(stream)
719                .await
720                .context(CollectRecordbatchSnafu)?;
721            record_batches_to_labels_name(batches, labels)
722        }
723        OutputData::AffectedRows(_) => UnexpectedResultSnafu {
724            reason: "expected data result, but got affected rows".to_string(),
725        }
726        .fail(),
727    }?;
728    if let Some(ref plan) = result.meta.plan {
729        collect_plan_metrics(plan, &mut [metrics]);
730    }
731    Ok(())
732}
733
734fn record_batches_to_series(
735    batches: RecordBatches,
736    series: &mut Vec<HashMap<Column, String>>,
737    table_name: &str,
738    tag_columns: &HashSet<String>,
739) -> Result<()> {
740    for batch in batches.iter() {
741        // project record batch to only contains tag columns
742        let projection = batch
743            .schema
744            .column_schemas()
745            .iter()
746            .enumerate()
747            .filter_map(|(idx, col)| {
748                if tag_columns.contains(&col.name) {
749                    Some(idx)
750                } else {
751                    None
752                }
753            })
754            .collect::<Vec<_>>();
755        let batch = batch
756            .try_project(&projection)
757            .context(CollectRecordbatchSnafu)?;
758
759        let mut writer = RowWriter::new(&batch.schema, table_name);
760        writer.write(batch, series)?;
761    }
762    Ok(())
763}
764
765/// Writer from a row in the record batch to a Prometheus time series:
766///
767/// `{__name__="<metric name>", <label name>="<label value>", ...}`
768///
769/// The metrics name is the table name; label names are the column names and
770/// the label values are the corresponding row values (all are converted to strings).
771struct RowWriter {
772    /// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
773    /// and label names, waiting to be filled by row values afterward.
774    template: HashMap<Column, Option<String>>,
775    /// The current filling row.
776    current: Option<HashMap<Column, Option<String>>>,
777}
778
779impl RowWriter {
780    fn new(schema: &SchemaRef, table: &str) -> Self {
781        let mut template = schema
782            .column_schemas()
783            .iter()
784            .map(|x| (x.name.as_str().into(), None))
785            .collect::<HashMap<Column, Option<String>>>();
786        template.insert("__name__".into(), Some(table.to_string()));
787        Self {
788            template,
789            current: None,
790        }
791    }
792
793    fn insert(&mut self, column: ColumnRef, value: impl ToString) {
794        let current = self.current.get_or_insert_with(|| self.template.clone());
795        match current.get_mut(&column as &dyn AsColumnRef) {
796            Some(x) => {
797                let _ = x.insert(value.to_string());
798            }
799            None => {
800                let _ = current.insert(column.0.into(), Some(value.to_string()));
801            }
802        }
803    }
804
805    fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
806        let column_name = column_schema.name.as_str().into();
807
808        if column_schema.data_type.is_json() {
809            let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
810            self.insert(column_name, s);
811        } else {
812            let hex = bytes
813                .iter()
814                .map(|b| format!("{b:02x}"))
815                .collect::<Vec<String>>()
816                .join("");
817            self.insert(column_name, hex);
818        }
819        Ok(())
820    }
821
822    fn finish(&mut self) -> HashMap<Column, String> {
823        let Some(current) = self.current.take() else {
824            return HashMap::new();
825        };
826        current
827            .into_iter()
828            .filter_map(|(k, v)| v.map(|v| (k, v)))
829            .collect()
830    }
831
832    fn write(
833        &mut self,
834        record_batch: RecordBatch,
835        series: &mut Vec<HashMap<Column, String>>,
836    ) -> Result<()> {
837        let schema = record_batch.schema.clone();
838        let record_batch = record_batch.into_df_record_batch();
839        for i in 0..record_batch.num_rows() {
840            for (j, array) in record_batch.columns().iter().enumerate() {
841                let column = schema.column_name_by_index(j).into();
842
843                if array.is_null(i) {
844                    self.insert(column, "Null");
845                    continue;
846                }
847
848                match array.data_type() {
849                    DataType::Null => {
850                        self.insert(column, "Null");
851                    }
852                    DataType::Boolean => {
853                        let array = array.as_boolean();
854                        let v = array.value(i);
855                        self.insert(column, v);
856                    }
857                    DataType::UInt8 => {
858                        let array = array.as_primitive::<UInt8Type>();
859                        let v = array.value(i);
860                        self.insert(column, v);
861                    }
862                    DataType::UInt16 => {
863                        let array = array.as_primitive::<UInt16Type>();
864                        let v = array.value(i);
865                        self.insert(column, v);
866                    }
867                    DataType::UInt32 => {
868                        let array = array.as_primitive::<UInt32Type>();
869                        let v = array.value(i);
870                        self.insert(column, v);
871                    }
872                    DataType::UInt64 => {
873                        let array = array.as_primitive::<UInt64Type>();
874                        let v = array.value(i);
875                        self.insert(column, v);
876                    }
877                    DataType::Int8 => {
878                        let array = array.as_primitive::<Int8Type>();
879                        let v = array.value(i);
880                        self.insert(column, v);
881                    }
882                    DataType::Int16 => {
883                        let array = array.as_primitive::<Int16Type>();
884                        let v = array.value(i);
885                        self.insert(column, v);
886                    }
887                    DataType::Int32 => {
888                        let array = array.as_primitive::<Int32Type>();
889                        let v = array.value(i);
890                        self.insert(column, v);
891                    }
892                    DataType::Int64 => {
893                        let array = array.as_primitive::<Int64Type>();
894                        let v = array.value(i);
895                        self.insert(column, v);
896                    }
897                    DataType::Float32 => {
898                        let array = array.as_primitive::<Float32Type>();
899                        let v = array.value(i);
900                        self.insert(column, v);
901                    }
902                    DataType::Float64 => {
903                        let array = array.as_primitive::<Float64Type>();
904                        let v = array.value(i);
905                        self.insert(column, v);
906                    }
907                    DataType::Utf8 => {
908                        let array = array.as_string::<i32>();
909                        let v = array.value(i);
910                        self.insert(column, v);
911                    }
912                    DataType::LargeUtf8 => {
913                        let array = array.as_string::<i64>();
914                        let v = array.value(i);
915                        self.insert(column, v);
916                    }
917                    DataType::Utf8View => {
918                        let array = array.as_string_view();
919                        let v = array.value(i);
920                        self.insert(column, v);
921                    }
922                    DataType::Binary => {
923                        let array = array.as_binary::<i32>();
924                        let v = array.value(i);
925                        let column_schema = &schema.column_schemas()[j];
926                        self.insert_bytes(column_schema, v)?;
927                    }
928                    DataType::LargeBinary => {
929                        let array = array.as_binary::<i64>();
930                        let v = array.value(i);
931                        let column_schema = &schema.column_schemas()[j];
932                        self.insert_bytes(column_schema, v)?;
933                    }
934                    DataType::BinaryView => {
935                        let array = array.as_binary_view();
936                        let v = array.value(i);
937                        let column_schema = &schema.column_schemas()[j];
938                        self.insert_bytes(column_schema, v)?;
939                    }
940                    DataType::Date32 => {
941                        let array = array.as_primitive::<Date32Type>();
942                        let v = Date::new(array.value(i));
943                        self.insert(column, v);
944                    }
945                    DataType::Date64 => {
946                        let array = array.as_primitive::<Date64Type>();
947                        // `Date64` values are milliseconds representation of `Date32` values,
948                        // according to its specification. So we convert the `Date64` value here to
949                        // the `Date32` value to process them unified.
950                        let v = Date::new((array.value(i) / 86_400_000) as i32);
951                        self.insert(column, v);
952                    }
953                    DataType::Timestamp(time_unit, _) => {
954                        let v = match time_unit {
955                            TimeUnit::Second => {
956                                let array = array.as_primitive::<TimestampSecondType>();
957                                array.value(i)
958                            }
959                            TimeUnit::Millisecond => {
960                                let array = array.as_primitive::<TimestampMillisecondType>();
961                                array.value(i)
962                            }
963                            TimeUnit::Microsecond => {
964                                let array = array.as_primitive::<TimestampMicrosecondType>();
965                                array.value(i)
966                            }
967                            TimeUnit::Nanosecond => {
968                                let array = array.as_primitive::<TimestampNanosecondType>();
969                                array.value(i)
970                            }
971                        };
972                        let v = Timestamp::new(v, time_unit.into());
973                        self.insert(column, v.to_iso8601_string());
974                    }
975                    DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
976                        let v = match time_unit {
977                            TimeUnit::Second => {
978                                let array = array.as_primitive::<Time32SecondType>();
979                                Time::new_second(array.value(i) as i64)
980                            }
981                            TimeUnit::Millisecond => {
982                                let array = array.as_primitive::<Time32MillisecondType>();
983                                Time::new_millisecond(array.value(i) as i64)
984                            }
985                            TimeUnit::Microsecond => {
986                                let array = array.as_primitive::<Time64MicrosecondType>();
987                                Time::new_microsecond(array.value(i))
988                            }
989                            TimeUnit::Nanosecond => {
990                                let array = array.as_primitive::<Time64NanosecondType>();
991                                Time::new_nanosecond(array.value(i))
992                            }
993                        };
994                        self.insert(column, v.to_iso8601_string());
995                    }
996                    DataType::Interval(interval_unit) => match interval_unit {
997                        IntervalUnit::YearMonth => {
998                            let array = array.as_primitive::<IntervalYearMonthType>();
999                            let v: IntervalYearMonth = array.value(i).into();
1000                            self.insert(column, v.to_iso8601_string());
1001                        }
1002                        IntervalUnit::DayTime => {
1003                            let array = array.as_primitive::<IntervalDayTimeType>();
1004                            let v: IntervalDayTime = array.value(i).into();
1005                            self.insert(column, v.to_iso8601_string());
1006                        }
1007                        IntervalUnit::MonthDayNano => {
1008                            let array = array.as_primitive::<IntervalMonthDayNanoType>();
1009                            let v: IntervalMonthDayNano = array.value(i).into();
1010                            self.insert(column, v.to_iso8601_string());
1011                        }
1012                    },
1013                    DataType::Duration(time_unit) => {
1014                        let v = match time_unit {
1015                            TimeUnit::Second => {
1016                                let array = array.as_primitive::<DurationSecondType>();
1017                                array.value(i)
1018                            }
1019                            TimeUnit::Millisecond => {
1020                                let array = array.as_primitive::<DurationMillisecondType>();
1021                                array.value(i)
1022                            }
1023                            TimeUnit::Microsecond => {
1024                                let array = array.as_primitive::<DurationMicrosecondType>();
1025                                array.value(i)
1026                            }
1027                            TimeUnit::Nanosecond => {
1028                                let array = array.as_primitive::<DurationNanosecondType>();
1029                                array.value(i)
1030                            }
1031                        };
1032                        let d = Duration::new(v, time_unit.into());
1033                        self.insert(column, d);
1034                    }
1035                    DataType::List(_) => {
1036                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
1037                        self.insert(column, v);
1038                    }
1039                    DataType::Struct(_) => {
1040                        let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
1041                        self.insert(column, v);
1042                    }
1043                    DataType::Decimal128(precision, scale) => {
1044                        let array = array.as_primitive::<Decimal128Type>();
1045                        let v = Decimal128::new(array.value(i), *precision, *scale);
1046                        self.insert(column, v);
1047                    }
1048                    _ => {
1049                        return NotSupportedSnafu {
1050                            feat: format!("convert {} to http value", array.data_type()),
1051                        }
1052                        .fail();
1053                    }
1054                }
1055            }
1056
1057            series.push(self.finish())
1058        }
1059        Ok(())
1060    }
1061}
1062
1063#[derive(Debug, Clone, Copy, PartialEq)]
1064struct ColumnRef<'a>(&'a str);
1065
1066impl<'a> From<&'a str> for ColumnRef<'a> {
1067    fn from(s: &'a str) -> Self {
1068        Self(s)
1069    }
1070}
1071
1072trait AsColumnRef {
1073    fn as_ref(&self) -> ColumnRef<'_>;
1074}
1075
1076impl AsColumnRef for Column {
1077    fn as_ref(&self) -> ColumnRef<'_> {
1078        self.0.as_str().into()
1079    }
1080}
1081
1082impl AsColumnRef for ColumnRef<'_> {
1083    fn as_ref(&self) -> ColumnRef<'_> {
1084        *self
1085    }
1086}
1087
1088impl<'a> PartialEq for dyn AsColumnRef + 'a {
1089    fn eq(&self, other: &Self) -> bool {
1090        self.as_ref() == other.as_ref()
1091    }
1092}
1093
1094impl<'a> Eq for dyn AsColumnRef + 'a {}
1095
1096impl<'a> Hash for dyn AsColumnRef + 'a {
1097    fn hash<H: Hasher>(&self, state: &mut H) {
1098        self.as_ref().0.hash(state);
1099    }
1100}
1101
1102impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
1103    fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
1104        self
1105    }
1106}
1107
1108/// Retrieve labels name from record batches
1109fn record_batches_to_labels_name(
1110    batches: RecordBatches,
1111    labels: &mut HashSet<String>,
1112) -> Result<()> {
1113    let mut column_indices = Vec::new();
1114    let mut field_column_indices = Vec::new();
1115    for (i, column) in batches.schema().column_schemas().iter().enumerate() {
1116        if let ConcreteDataType::Float64(_) = column.data_type {
1117            field_column_indices.push(i);
1118        }
1119        column_indices.push(i);
1120    }
1121
1122    if field_column_indices.is_empty() {
1123        return Err(Error::Internal {
1124            err_msg: "no value column found".to_string(),
1125        });
1126    }
1127
1128    for batch in batches.iter() {
1129        let names = column_indices
1130            .iter()
1131            .map(|c| batches.schema().column_name_by_index(*c).to_string())
1132            .collect::<Vec<_>>();
1133
1134        let field_columns = field_column_indices
1135            .iter()
1136            .map(|i| {
1137                batch
1138                    .column(*i)
1139                    .as_any()
1140                    .downcast_ref::<Float64Vector>()
1141                    .unwrap()
1142            })
1143            .collect::<Vec<_>>();
1144
1145        for row_index in 0..batch.num_rows() {
1146            // if all field columns are null, skip this row
1147            if field_columns
1148                .iter()
1149                .all(|c| c.get_data(row_index).is_none())
1150            {
1151                continue;
1152            }
1153
1154            // if a field is not null, record the tag name and return
1155            names.iter().for_each(|name| {
1156                let _ = labels.insert(name.clone());
1157            });
1158            return Ok(());
1159        }
1160    }
1161    Ok(())
1162}
1163
1164pub(crate) fn retrieve_metric_name_and_result_type(
1165    promql: &str,
1166) -> Result<(Option<String>, ValueType)> {
1167    let promql_expr = promql_parser::parser::parse(promql)
1168        .map_err(|reason| InvalidQuerySnafu { reason }.build())?;
1169    let metric_name = promql_expr_to_metric_name(&promql_expr);
1170    let result_type = promql_expr.value_type();
1171
1172    Ok((metric_name, result_type))
1173}
1174
1175/// Tries to get catalog and schema from an optional db param. And retrieves
1176/// them from [QueryContext] if they don't present.
1177pub(crate) fn get_catalog_schema(db: &Option<String>, ctx: &QueryContext) -> (String, String) {
1178    if let Some(db) = db {
1179        parse_catalog_and_schema_from_db_string(db)
1180    } else {
1181        (
1182            ctx.current_catalog().to_string(),
1183            ctx.current_schema().clone(),
1184        )
1185    }
1186}
1187
1188/// Update catalog and schema in [QueryContext] if necessary.
1189pub(crate) fn try_update_catalog_schema(ctx: &mut QueryContext, catalog: &str, schema: &str) {
1190    if ctx.current_catalog() != catalog || ctx.current_schema() != schema {
1191        ctx.set_current_catalog(catalog);
1192        ctx.set_current_schema(schema);
1193    }
1194}
1195
1196fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
1197    let mut metric_names = HashSet::new();
1198    collect_metric_names(expr, &mut metric_names);
1199
1200    // Return the metric name only if there's exactly one unique metric name
1201    if metric_names.len() == 1 {
1202        metric_names.into_iter().next()
1203    } else {
1204        None
1205    }
1206}
1207
1208/// Recursively collect all metric names from a PromQL expression
1209fn collect_metric_names(expr: &PromqlExpr, metric_names: &mut HashSet<String>) {
1210    match expr {
1211        PromqlExpr::Aggregate(AggregateExpr { modifier, expr, .. }) => {
1212            match modifier {
1213                Some(LabelModifier::Include(labels)) => {
1214                    if !labels.labels.contains(&METRIC_NAME.to_string()) {
1215                        metric_names.clear();
1216                        return;
1217                    }
1218                }
1219                Some(LabelModifier::Exclude(labels)) => {
1220                    if labels.labels.contains(&METRIC_NAME.to_string()) {
1221                        metric_names.clear();
1222                        return;
1223                    }
1224                }
1225                _ => {}
1226            }
1227            collect_metric_names(expr, metric_names)
1228        }
1229        PromqlExpr::Unary(UnaryExpr { .. }) => metric_names.clear(),
1230        PromqlExpr::Binary(BinaryExpr { lhs, op, .. }) => {
1231            if matches!(
1232                op.id(),
1233                token::T_LAND // INTERSECT
1234                    | token::T_LOR // UNION
1235                    | token::T_LUNLESS // EXCEPT
1236            ) {
1237                collect_metric_names(lhs, metric_names)
1238            } else {
1239                metric_names.clear()
1240            }
1241        }
1242        PromqlExpr::Paren(ParenExpr { expr }) => collect_metric_names(expr, metric_names),
1243        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => collect_metric_names(expr, metric_names),
1244        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1245            if let Some(name) = name {
1246                metric_names.insert(name.clone());
1247            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1248                metric_names.insert(matcher.value);
1249            }
1250        }
1251        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1252            let VectorSelector { name, matchers, .. } = vs;
1253            if let Some(name) = name {
1254                metric_names.insert(name.clone());
1255            } else if let Some(matcher) = matchers.find_matchers(METRIC_NAME).into_iter().next() {
1256                metric_names.insert(matcher.value);
1257            }
1258        }
1259        PromqlExpr::Call(Call { args, .. }) => {
1260            args.args
1261                .iter()
1262                .for_each(|e| collect_metric_names(e, metric_names));
1263        }
1264        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1265    }
1266}
1267
1268fn find_metric_name_and_matchers<E, F>(expr: &PromqlExpr, f: F) -> Option<E>
1269where
1270    F: Fn(&Option<String>, &Matchers) -> Option<E> + Clone,
1271{
1272    match expr {
1273        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1274        PromqlExpr::Unary(UnaryExpr { expr }) => find_metric_name_and_matchers(expr, f),
1275        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1276            find_metric_name_and_matchers(lhs, f.clone()).or(find_metric_name_and_matchers(rhs, f))
1277        }
1278        PromqlExpr::Paren(ParenExpr { expr }) => find_metric_name_and_matchers(expr, f),
1279        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => find_metric_name_and_matchers(expr, f),
1280        PromqlExpr::NumberLiteral(_) => None,
1281        PromqlExpr::StringLiteral(_) => None,
1282        PromqlExpr::Extension(_) => None,
1283        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => f(name, matchers),
1284        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1285            let VectorSelector { name, matchers, .. } = vs;
1286
1287            f(name, matchers)
1288        }
1289        PromqlExpr::Call(Call { args, .. }) => args
1290            .args
1291            .iter()
1292            .find_map(|e| find_metric_name_and_matchers(e, f.clone())),
1293    }
1294}
1295
1296/// Try to find the `__name__` matchers which op is not `MatchOp::Equal`.
1297fn find_metric_name_not_equal_matchers(expr: &PromqlExpr) -> Option<Vec<Matcher>> {
1298    find_metric_name_and_matchers(expr, |name, matchers| {
1299        // Has name, ignore the matchers
1300        if name.is_some() {
1301            return None;
1302        }
1303
1304        // FIXME(dennis): we don't consider the nested and `or` matchers yet.
1305        Some(matchers.find_matchers(METRIC_NAME))
1306    })
1307    .map(|matchers| {
1308        matchers
1309            .into_iter()
1310            .filter(|m| !matches!(m.op, MatchOp::Equal))
1311            .map(normalize_matcher)
1312            .collect::<Vec<_>>()
1313    })
1314}
1315
1316/// Update the `__name__` matchers in expression into special value
1317/// Returns the updated expression.
1318fn update_metric_name_matcher(expr: &mut PromqlExpr, metric_name: &str) {
1319    match expr {
1320        PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => {
1321            update_metric_name_matcher(expr, metric_name)
1322        }
1323        PromqlExpr::Unary(UnaryExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1324        PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
1325            update_metric_name_matcher(lhs, metric_name);
1326            update_metric_name_matcher(rhs, metric_name);
1327        }
1328        PromqlExpr::Paren(ParenExpr { expr }) => update_metric_name_matcher(expr, metric_name),
1329        PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => {
1330            update_metric_name_matcher(expr, metric_name)
1331        }
1332        PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
1333            if name.is_some() {
1334                return;
1335            }
1336
1337            for m in &mut matchers.matchers {
1338                if m.name == METRIC_NAME {
1339                    m.op = MatchOp::Equal;
1340                    m.value = metric_name.to_string();
1341                }
1342            }
1343        }
1344        PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
1345            let VectorSelector { name, matchers, .. } = vs;
1346            if name.is_some() {
1347                return;
1348            }
1349
1350            for m in &mut matchers.matchers {
1351                if m.name == METRIC_NAME {
1352                    m.op = MatchOp::Equal;
1353                    m.value = metric_name.to_string();
1354                }
1355            }
1356        }
1357        PromqlExpr::Call(Call { args, .. }) => {
1358            args.args.iter_mut().for_each(|e| {
1359                update_metric_name_matcher(e, metric_name);
1360            });
1361        }
1362        PromqlExpr::NumberLiteral(_) | PromqlExpr::StringLiteral(_) | PromqlExpr::Extension(_) => {}
1363    }
1364}
1365
1366#[derive(Debug, Default, Serialize, Deserialize)]
1367pub struct LabelValueQuery {
1368    start: Option<String>,
1369    end: Option<String>,
1370    lookback: Option<String>,
1371    #[serde(flatten)]
1372    matches: Matches,
1373    db: Option<String>,
1374    limit: Option<usize>,
1375}
1376
1377#[axum_macros::debug_handler]
1378#[tracing::instrument(
1379    skip_all,
1380    fields(protocol = "prometheus", request_type = "label_values_query")
1381)]
1382pub async fn label_values_query(
1383    State(handler): State<PrometheusHandlerRef>,
1384    Path(label_name): Path<String>,
1385    Extension(mut query_ctx): Extension<QueryContext>,
1386    Query(params): Query<LabelValueQuery>,
1387) -> PrometheusJsonResponse {
1388    let (catalog, schema) = get_catalog_schema(&params.db, &query_ctx);
1389    try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1390    let query_ctx = Arc::new(query_ctx);
1391
1392    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1393        .with_label_values(&[query_ctx.get_db_string().as_str(), "label_values_query"])
1394        .start_timer();
1395
1396    if label_name == METRIC_NAME_LABEL {
1397        let catalog_manager = handler.catalog_manager();
1398
1399        let mut table_names = try_call_return_response!(
1400            retrieve_table_names(&query_ctx, catalog_manager, params.matches.0).await
1401        );
1402
1403        truncate_results(&mut table_names, params.limit);
1404        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
1405    } else if label_name == FIELD_NAME_LABEL {
1406        let field_columns = handle_schema_err!(
1407            retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0).await
1408        )
1409        .unwrap_or_default();
1410        let mut field_columns = field_columns.into_iter().collect::<Vec<_>>();
1411        field_columns.sort_unstable();
1412        truncate_results(&mut field_columns, params.limit);
1413        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(field_columns));
1414    } else if label_name == SCHEMA_LABEL || label_name == DATABASE_LABEL {
1415        let catalog_manager = handler.catalog_manager();
1416
1417        let mut schema_names = try_call_return_response!(
1418            retrieve_schema_names(&query_ctx, catalog_manager, params.matches.0).await
1419        );
1420        truncate_results(&mut schema_names, params.limit);
1421        return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(schema_names));
1422    }
1423
1424    let queries = params.matches.0;
1425    if queries.is_empty() {
1426        return PrometheusJsonResponse::error(
1427            StatusCode::InvalidArguments,
1428            "match[] parameter is required",
1429        );
1430    }
1431
1432    let start = params.start.unwrap_or_else(yesterday_rfc3339);
1433    let end = params.end.unwrap_or_else(current_time_rfc3339);
1434    let mut label_values = HashSet::new();
1435
1436    let start = try_call_return_response!(
1437        QueryLanguageParser::parse_promql_timestamp(&start)
1438            .context(ParseTimestampSnafu { timestamp: &start })
1439    );
1440    let end = try_call_return_response!(
1441        QueryLanguageParser::parse_promql_timestamp(&end)
1442            .context(ParseTimestampSnafu { timestamp: &end })
1443    );
1444
1445    for query in queries {
1446        let promql_expr = try_call_return_response!(promql_parser::parser::parse(&query));
1447        let PromqlExpr::VectorSelector(mut vector_selector) = promql_expr else {
1448            return PrometheusJsonResponse::error(
1449                StatusCode::InvalidArguments,
1450                "expected vector selector",
1451            );
1452        };
1453        let Some(name) = take_metric_name(&mut vector_selector) else {
1454            return PrometheusJsonResponse::error(
1455                StatusCode::InvalidArguments,
1456                "expected metric name",
1457            );
1458        };
1459        let VectorSelector { matchers, .. } = vector_selector;
1460        // Only use and filter matchers.
1461        let matchers = matchers.matchers;
1462        let result = handler
1463            .query_label_values(name, label_name.clone(), matchers, start, end, &query_ctx)
1464            .await;
1465        if let Some(result) = handle_schema_err!(result) {
1466            label_values.extend(result.into_iter());
1467        }
1468    }
1469
1470    let mut label_values: Vec<_> = label_values.into_iter().collect();
1471    label_values.sort_unstable();
1472    truncate_results(&mut label_values, params.limit);
1473
1474    PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
1475}
1476
1477fn truncate_results(label_values: &mut Vec<String>, limit: Option<usize>) {
1478    if let Some(limit) = limit
1479        && limit > 0
1480        && label_values.len() >= limit
1481    {
1482        label_values.truncate(limit);
1483    }
1484}
1485
1486/// Take metric name from the [VectorSelector].
1487/// It takes the name in the selector or removes the name matcher.
1488fn take_metric_name(selector: &mut VectorSelector) -> Option<String> {
1489    if let Some(name) = selector.name.take() {
1490        return Some(name);
1491    }
1492
1493    let (pos, matcher) = selector
1494        .matchers
1495        .matchers
1496        .iter()
1497        .find_position(|matcher| matcher.name == "__name__" && matcher.op == MatchOp::Equal)?;
1498    let name = matcher.value.clone();
1499    // We need to remove the name matcher to avoid using it as a filter in query.
1500    selector.matchers.matchers.remove(pos);
1501
1502    Some(name)
1503}
1504
1505async fn retrieve_table_names(
1506    query_ctx: &QueryContext,
1507    catalog_manager: CatalogManagerRef,
1508    matches: Vec<String>,
1509) -> Result<Vec<String>> {
1510    let catalog = query_ctx.current_catalog();
1511    let schema = query_ctx.current_schema();
1512
1513    let mut tables_stream = catalog_manager.tables(catalog, &schema, Some(query_ctx));
1514    let mut table_names = Vec::new();
1515
1516    // we only provide very limited support for matcher against __name__
1517    let name_matcher = matches
1518        .first()
1519        .and_then(|matcher| promql_parser::parser::parse(matcher).ok())
1520        .and_then(|expr| {
1521            if let PromqlExpr::VectorSelector(vector_selector) = expr {
1522                let matchers = vector_selector.matchers.matchers;
1523                for matcher in matchers {
1524                    if matcher.name == METRIC_NAME_LABEL {
1525                        return Some(matcher);
1526                    }
1527                }
1528
1529                None
1530            } else {
1531                None
1532            }
1533        });
1534
1535    while let Some(table) = tables_stream.next().await {
1536        let table = table.context(CatalogSnafu)?;
1537        if !table
1538            .table_info()
1539            .meta
1540            .options
1541            .extra_options
1542            .contains_key(LOGICAL_TABLE_METADATA_KEY)
1543        {
1544            // skip non-prometheus (non-metricengine) tables for __name__ query
1545            continue;
1546        }
1547
1548        let table_name = &table.table_info().name;
1549
1550        if let Some(matcher) = &name_matcher {
1551            match &matcher.op {
1552                MatchOp::Equal => {
1553                    if table_name == &matcher.value {
1554                        table_names.push(table_name.clone());
1555                    }
1556                }
1557                MatchOp::Re(reg) => {
1558                    if reg.is_match(table_name) {
1559                        table_names.push(table_name.clone());
1560                    }
1561                }
1562                _ => {
1563                    // != and !~ are not supported:
1564                    // vector must contains at least one non-empty matcher
1565                    table_names.push(table_name.clone());
1566                }
1567            }
1568        } else {
1569            table_names.push(table_name.clone());
1570        }
1571    }
1572
1573    table_names.sort_unstable();
1574    Ok(table_names)
1575}
1576
1577async fn retrieve_field_names(
1578    query_ctx: &QueryContext,
1579    manager: CatalogManagerRef,
1580    matches: Vec<String>,
1581) -> Result<HashSet<String>> {
1582    let mut field_columns = HashSet::new();
1583    let catalog = query_ctx.current_catalog();
1584    let schema = query_ctx.current_schema();
1585
1586    if matches.is_empty() {
1587        // query all tables if no matcher is provided
1588        while let Some(table) = manager
1589            .tables(catalog, &schema, Some(query_ctx))
1590            .next()
1591            .await
1592        {
1593            let table = table.context(CatalogSnafu)?;
1594            for column in table.field_columns() {
1595                field_columns.insert(column.name);
1596            }
1597        }
1598        return Ok(field_columns);
1599    }
1600
1601    for table_name in matches {
1602        let table = manager
1603            .table(catalog, &schema, &table_name, Some(query_ctx))
1604            .await
1605            .context(CatalogSnafu)?
1606            .with_context(|| TableNotFoundSnafu {
1607                catalog: catalog.to_string(),
1608                schema: schema.clone(),
1609                table: table_name.clone(),
1610            })?;
1611
1612        for column in table.field_columns() {
1613            field_columns.insert(column.name);
1614        }
1615    }
1616    Ok(field_columns)
1617}
1618
1619async fn retrieve_schema_names(
1620    query_ctx: &QueryContext,
1621    catalog_manager: CatalogManagerRef,
1622    matches: Vec<String>,
1623) -> Result<Vec<String>> {
1624    let mut schemas = Vec::new();
1625    let catalog = query_ctx.current_catalog();
1626
1627    let candidate_schemas = catalog_manager
1628        .schema_names(catalog, Some(query_ctx))
1629        .await
1630        .context(CatalogSnafu)?;
1631
1632    for schema in candidate_schemas {
1633        let mut found = true;
1634        for match_item in &matches {
1635            if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
1636                let exists = catalog_manager
1637                    .table_exists(catalog, &schema, &table_name, Some(query_ctx))
1638                    .await
1639                    .context(CatalogSnafu)?;
1640                if !exists {
1641                    found = false;
1642                    break;
1643                }
1644            }
1645        }
1646
1647        if found {
1648            schemas.push(schema);
1649        }
1650    }
1651
1652    schemas.sort_unstable();
1653
1654    Ok(schemas)
1655}
1656
1657/// Try to parse and extract the name of referenced metric from the promql query.
1658///
1659/// Returns the metric name if exactly one unique metric is referenced, otherwise None.
1660/// Multiple references to the same metric are allowed.
1661fn retrieve_metric_name_from_promql(query: &str) -> Option<String> {
1662    let promql_expr = promql_parser::parser::parse(query).ok()?;
1663    promql_expr_to_metric_name(&promql_expr)
1664}
1665
1666#[derive(Debug, Default, Serialize, Deserialize)]
1667pub struct SeriesQuery {
1668    start: Option<String>,
1669    end: Option<String>,
1670    lookback: Option<String>,
1671    #[serde(flatten)]
1672    matches: Matches,
1673    db: Option<String>,
1674}
1675
1676#[axum_macros::debug_handler]
1677#[tracing::instrument(
1678    skip_all,
1679    fields(protocol = "prometheus", request_type = "series_query")
1680)]
1681pub async fn series_query(
1682    State(handler): State<PrometheusHandlerRef>,
1683    Query(params): Query<SeriesQuery>,
1684    Extension(mut query_ctx): Extension<QueryContext>,
1685    Form(form_params): Form<SeriesQuery>,
1686) -> PrometheusJsonResponse {
1687    let mut queries: Vec<String> = params.matches.0;
1688    if queries.is_empty() {
1689        queries = form_params.matches.0;
1690    }
1691    if queries.is_empty() {
1692        return PrometheusJsonResponse::error(
1693            StatusCode::Unsupported,
1694            "match[] parameter is required",
1695        );
1696    }
1697    let start = params
1698        .start
1699        .or(form_params.start)
1700        .unwrap_or_else(yesterday_rfc3339);
1701    let end = params
1702        .end
1703        .or(form_params.end)
1704        .unwrap_or_else(current_time_rfc3339);
1705    let lookback = params
1706        .lookback
1707        .or(form_params.lookback)
1708        .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());
1709
1710    // update catalog and schema in query context if necessary
1711    if let Some(db) = &params.db {
1712        let (catalog, schema) = parse_catalog_and_schema_from_db_string(db);
1713        try_update_catalog_schema(&mut query_ctx, &catalog, &schema);
1714    }
1715    let query_ctx = Arc::new(query_ctx);
1716
1717    let _timer = crate::metrics::METRIC_HTTP_PROMETHEUS_PROMQL_ELAPSED
1718        .with_label_values(&[query_ctx.get_db_string().as_str(), "series_query"])
1719        .start_timer();
1720
1721    let mut series = Vec::new();
1722    let mut merge_map = HashMap::new();
1723    for query in queries {
1724        let table_name = retrieve_metric_name_from_promql(&query).unwrap_or_default();
1725        let prom_query = PromQuery {
1726            query,
1727            start: start.clone(),
1728            end: end.clone(),
1729            // TODO: find a better value for step
1730            step: DEFAULT_LOOKBACK_STRING.to_string(),
1731            lookback: lookback.clone(),
1732            alias: None,
1733        };
1734        let result = handler.do_query(&prom_query, query_ctx.clone()).await;
1735
1736        handle_schema_err!(
1737            retrieve_series_from_query_result(
1738                result,
1739                &mut series,
1740                &query_ctx,
1741                &table_name,
1742                &handler.catalog_manager(),
1743                &mut merge_map,
1744            )
1745            .await
1746        );
1747    }
1748    let merge_map = merge_map
1749        .into_iter()
1750        .map(|(k, v)| (k, Value::from(v)))
1751        .collect();
1752    let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
1753    resp.resp_metrics = merge_map;
1754    resp
1755}
1756
1757#[derive(Debug, Default, Serialize, Deserialize)]
1758pub struct ParseQuery {
1759    query: Option<String>,
1760    db: Option<String>,
1761}
1762
1763#[axum_macros::debug_handler]
1764#[tracing::instrument(
1765    skip_all,
1766    fields(protocol = "prometheus", request_type = "parse_query")
1767)]
1768pub async fn parse_query(
1769    State(_handler): State<PrometheusHandlerRef>,
1770    Query(params): Query<ParseQuery>,
1771    Extension(_query_ctx): Extension<QueryContext>,
1772    Form(form_params): Form<ParseQuery>,
1773) -> PrometheusJsonResponse {
1774    if let Some(query) = params.query.or(form_params.query) {
1775        let ast = try_call_return_response!(promql_parser::parser::parse(&query));
1776        PrometheusJsonResponse::success(PrometheusResponse::ParseResult(ast))
1777    } else {
1778        PrometheusJsonResponse::error(StatusCode::InvalidArguments, "query is required")
1779    }
1780}
1781
1782#[cfg(test)]
1783mod tests {
1784    use promql_parser::parser::value::ValueType;
1785
1786    use super::*;
1787
1788    struct TestCase {
1789        name: &'static str,
1790        promql: &'static str,
1791        expected_metric: Option<&'static str>,
1792        expected_type: ValueType,
1793        should_error: bool,
1794    }
1795
1796    #[test]
1797    fn test_retrieve_metric_name_and_result_type() {
1798        let test_cases = &[
1799            // Single metric cases
1800            TestCase {
1801                name: "simple metric",
1802                promql: "cpu_usage",
1803                expected_metric: Some("cpu_usage"),
1804                expected_type: ValueType::Vector,
1805                should_error: false,
1806            },
1807            TestCase {
1808                name: "metric with selector",
1809                promql: r#"cpu_usage{instance="localhost"}"#,
1810                expected_metric: Some("cpu_usage"),
1811                expected_type: ValueType::Vector,
1812                should_error: false,
1813            },
1814            TestCase {
1815                name: "metric with range selector",
1816                promql: "cpu_usage[5m]",
1817                expected_metric: Some("cpu_usage"),
1818                expected_type: ValueType::Matrix,
1819                should_error: false,
1820            },
1821            TestCase {
1822                name: "metric with __name__ matcher",
1823                promql: r#"{__name__="cpu_usage"}"#,
1824                expected_metric: Some("cpu_usage"),
1825                expected_type: ValueType::Vector,
1826                should_error: false,
1827            },
1828            TestCase {
1829                name: "metric with unary operator",
1830                promql: "-cpu_usage",
1831                expected_metric: None,
1832                expected_type: ValueType::Vector,
1833                should_error: false,
1834            },
1835            // Aggregation and function cases
1836            TestCase {
1837                name: "metric with aggregation",
1838                promql: "sum(cpu_usage)",
1839                expected_metric: Some("cpu_usage"),
1840                expected_type: ValueType::Vector,
1841                should_error: false,
1842            },
1843            TestCase {
1844                name: "complex aggregation",
1845                promql: r#"sum by (instance) (cpu_usage{job="node"})"#,
1846                expected_metric: None,
1847                expected_type: ValueType::Vector,
1848                should_error: false,
1849            },
1850            TestCase {
1851                name: "complex aggregation",
1852                promql: r#"sum by (__name__) (cpu_usage{job="node"})"#,
1853                expected_metric: Some("cpu_usage"),
1854                expected_type: ValueType::Vector,
1855                should_error: false,
1856            },
1857            TestCase {
1858                name: "complex aggregation",
1859                promql: r#"sum without (instance) (cpu_usage{job="node"})"#,
1860                expected_metric: Some("cpu_usage"),
1861                expected_type: ValueType::Vector,
1862                should_error: false,
1863            },
1864            // Same metric binary operations
1865            TestCase {
1866                name: "same metric addition",
1867                promql: "cpu_usage + cpu_usage",
1868                expected_metric: None,
1869                expected_type: ValueType::Vector,
1870                should_error: false,
1871            },
1872            TestCase {
1873                name: "metric with scalar addition",
1874                promql: r#"sum(rate(cpu_usage{job="node"}[5m])) + 100"#,
1875                expected_metric: None,
1876                expected_type: ValueType::Vector,
1877                should_error: false,
1878            },
1879            // Multiple metrics cases
1880            TestCase {
1881                name: "different metrics addition",
1882                promql: "cpu_usage + memory_usage",
1883                expected_metric: None,
1884                expected_type: ValueType::Vector,
1885                should_error: false,
1886            },
1887            TestCase {
1888                name: "different metrics subtraction",
1889                promql: "network_in - network_out",
1890                expected_metric: None,
1891                expected_type: ValueType::Vector,
1892                should_error: false,
1893            },
1894            // Unless operator cases
1895            TestCase {
1896                name: "unless with different metrics",
1897                promql: "cpu_usage unless memory_usage",
1898                expected_metric: Some("cpu_usage"),
1899                expected_type: ValueType::Vector,
1900                should_error: false,
1901            },
1902            TestCase {
1903                name: "unless with same metric",
1904                promql: "cpu_usage unless cpu_usage",
1905                expected_metric: Some("cpu_usage"),
1906                expected_type: ValueType::Vector,
1907                should_error: false,
1908            },
1909            // Subquery cases
1910            TestCase {
1911                name: "basic subquery",
1912                promql: "cpu_usage[5m:1m]",
1913                expected_metric: Some("cpu_usage"),
1914                expected_type: ValueType::Matrix,
1915                should_error: false,
1916            },
1917            TestCase {
1918                name: "subquery with multiple metrics",
1919                promql: "(cpu_usage + memory_usage)[5m:1m]",
1920                expected_metric: None,
1921                expected_type: ValueType::Matrix,
1922                should_error: false,
1923            },
1924            // Literal values
1925            TestCase {
1926                name: "scalar value",
1927                promql: "42",
1928                expected_metric: None,
1929                expected_type: ValueType::Scalar,
1930                should_error: false,
1931            },
1932            TestCase {
1933                name: "string literal",
1934                promql: r#""hello world""#,
1935                expected_metric: None,
1936                expected_type: ValueType::String,
1937                should_error: false,
1938            },
1939            // Error cases
1940            TestCase {
1941                name: "invalid syntax",
1942                promql: "cpu_usage{invalid=",
1943                expected_metric: None,
1944                expected_type: ValueType::Vector,
1945                should_error: true,
1946            },
1947            TestCase {
1948                name: "empty query",
1949                promql: "",
1950                expected_metric: None,
1951                expected_type: ValueType::Vector,
1952                should_error: true,
1953            },
1954            TestCase {
1955                name: "malformed brackets",
1956                promql: "cpu_usage[5m",
1957                expected_metric: None,
1958                expected_type: ValueType::Vector,
1959                should_error: true,
1960            },
1961        ];
1962
1963        for test_case in test_cases {
1964            let result = retrieve_metric_name_and_result_type(test_case.promql);
1965
1966            if test_case.should_error {
1967                assert!(
1968                    result.is_err(),
1969                    "Test '{}' should have failed but succeeded with: {:?}",
1970                    test_case.name,
1971                    result
1972                );
1973            } else {
1974                let (metric_name, value_type) = result.unwrap_or_else(|e| {
1975                    panic!(
1976                        "Test '{}' should have succeeded but failed with error: {}",
1977                        test_case.name, e
1978                    )
1979                });
1980
1981                let expected_metric_name = test_case.expected_metric.map(|s| s.to_string());
1982                assert_eq!(
1983                    metric_name, expected_metric_name,
1984                    "Test '{}': metric name mismatch. Expected: {:?}, Got: {:?}",
1985                    test_case.name, expected_metric_name, metric_name
1986                );
1987
1988                assert_eq!(
1989                    value_type, test_case.expected_type,
1990                    "Test '{}': value type mismatch. Expected: {:?}, Got: {:?}",
1991                    test_case.name, test_case.expected_type, value_type
1992                );
1993            }
1994        }
1995    }
1996}