servers/grpc/
prom_query_gateway.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PrometheusGateway provides a gRPC interface to query Prometheus metrics
16//! by PromQL. The behavior is similar to the Prometheus HTTP API.
17
18use std::sync::Arc;
19
20use api::v1::prometheus_gateway_server::PrometheusGateway;
21use api::v1::promql_request::Promql;
22use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
23use async_trait::async_trait;
24use auth::UserProviderRef;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_time::util::current_time_rfc3339;
28use promql_parser::parser::value::ValueType;
29use query::parser::PromQuery;
30use session::context::{Channel, QueryContext};
31use snafu::OptionExt;
32use tonic::{Request, Response};
33
34use crate::error::InvalidQuerySnafu;
35use crate::grpc::TonicResult;
36use crate::grpc::context_auth::auth;
37use crate::grpc::greptime_handler::create_query_context;
38use crate::http::prometheus::{PrometheusJsonResponse, retrieve_metric_name_and_result_type};
39use crate::prometheus_handler::PrometheusHandlerRef;
40
41pub struct PrometheusGatewayService {
42    handler: PrometheusHandlerRef,
43    user_provider: Option<UserProviderRef>,
44}
45
46#[async_trait]
47impl PrometheusGateway for PrometheusGatewayService {
48    async fn handle(&self, req: Request<PromqlRequest>) -> TonicResult<Response<PromqlResponse>> {
49        let mut is_range_query = false;
50        let inner = req.into_inner();
51        let prom_query = match inner.promql.context(InvalidQuerySnafu {
52            reason: "Expecting non-empty PromqlRequest.",
53        })? {
54            Promql::RangeQuery(range_query) => {
55                is_range_query = true;
56                PromQuery {
57                    query: range_query.query,
58                    start: range_query.start,
59                    end: range_query.end,
60                    step: range_query.step,
61                    lookback: range_query.lookback,
62                    alias: None,
63                }
64            }
65            Promql::InstantQuery(instant_query) => {
66                let time = if instant_query.time.is_empty() {
67                    current_time_rfc3339()
68                } else {
69                    instant_query.time
70                };
71                PromQuery {
72                    query: instant_query.query,
73                    start: time.clone(),
74                    end: time,
75                    step: String::from("1s"),
76                    lookback: instant_query.lookback,
77                    alias: None,
78                }
79            }
80        };
81
82        let header = inner.header.as_ref();
83        let query_ctx = create_query_context(Channel::Promql, header, Default::default())?;
84
85        let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
86        query_ctx.set_current_user(user_info);
87
88        let json_response = self
89            .handle_inner(prom_query, query_ctx, is_range_query)
90            .await;
91        let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
92
93        let response = Response::new(PromqlResponse {
94            header: Some(ResponseHeader {
95                status: Some(api::v1::Status {
96                    status_code: StatusCode::Success as _,
97                    ..Default::default()
98                }),
99            }),
100            body: json_bytes,
101        });
102        Ok(response)
103    }
104}
105
106impl PrometheusGatewayService {
107    pub fn new(handler: PrometheusHandlerRef, user_provider: Option<UserProviderRef>) -> Self {
108        Self {
109            handler,
110            user_provider,
111        }
112    }
113
114    async fn handle_inner(
115        &self,
116        query: PromQuery,
117        ctx: Arc<QueryContext>,
118        is_range_query: bool,
119    ) -> PrometheusJsonResponse {
120        let db = ctx.get_db_string();
121        let _timer = crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER
122            .with_label_values(&[db.as_str()])
123            .start_timer();
124
125        let result = self.handler.do_query(&query, ctx).await;
126        let (metric_name, mut result_type) =
127            match retrieve_metric_name_and_result_type(&query.query) {
128                Ok((metric_name, result_type)) => (metric_name, result_type),
129                Err(err) => {
130                    return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
131                }
132            };
133        // range query only returns matrix
134        if is_range_query {
135            result_type = ValueType::Matrix;
136        };
137
138        PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
139    }
140}