servers/grpc/
prom_query_gateway.rs1use std::sync::Arc;
19
20use api::v1::prometheus_gateway_server::PrometheusGateway;
21use api::v1::promql_request::Promql;
22use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
23use async_trait::async_trait;
24use auth::UserProviderRef;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_time::util::current_time_rfc3339;
28use promql_parser::parser::value::ValueType;
29use query::parser::PromQuery;
30use session::context::QueryContext;
31use snafu::OptionExt;
32use tonic::{Request, Response};
33
34use crate::error::InvalidQuerySnafu;
35use crate::grpc::greptime_handler::{auth, create_query_context};
36use crate::grpc::TonicResult;
37use crate::http::prometheus::{retrieve_metric_name_and_result_type, PrometheusJsonResponse};
38use crate::prometheus_handler::PrometheusHandlerRef;
39
40pub struct PrometheusGatewayService {
41 handler: PrometheusHandlerRef,
42 user_provider: Option<UserProviderRef>,
43}
44
45#[async_trait]
46impl PrometheusGateway for PrometheusGatewayService {
47 async fn handle(&self, req: Request<PromqlRequest>) -> TonicResult<Response<PromqlResponse>> {
48 let mut is_range_query = false;
49 let inner = req.into_inner();
50 let prom_query = match inner.promql.context(InvalidQuerySnafu {
51 reason: "Expecting non-empty PromqlRequest.",
52 })? {
53 Promql::RangeQuery(range_query) => {
54 is_range_query = true;
55 PromQuery {
56 query: range_query.query,
57 start: range_query.start,
58 end: range_query.end,
59 step: range_query.step,
60 lookback: range_query.lookback,
61 }
62 }
63 Promql::InstantQuery(instant_query) => {
64 let time = if instant_query.time.is_empty() {
65 current_time_rfc3339()
66 } else {
67 instant_query.time
68 };
69 PromQuery {
70 query: instant_query.query,
71 start: time.clone(),
72 end: time,
73 step: String::from("1s"),
74 lookback: instant_query.lookback,
75 }
76 }
77 };
78
79 let header = inner.header.as_ref();
80 let query_ctx = create_query_context(header, Default::default())?;
81 let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
82 query_ctx.set_current_user(user_info);
83
84 let json_response = self
85 .handle_inner(prom_query, query_ctx, is_range_query)
86 .await;
87 let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
88
89 let response = Response::new(PromqlResponse {
90 header: Some(ResponseHeader {
91 status: Some(api::v1::Status {
92 status_code: StatusCode::Success as _,
93 ..Default::default()
94 }),
95 }),
96 body: json_bytes,
97 });
98 Ok(response)
99 }
100}
101
102impl PrometheusGatewayService {
103 pub fn new(handler: PrometheusHandlerRef, user_provider: Option<UserProviderRef>) -> Self {
104 Self {
105 handler,
106 user_provider,
107 }
108 }
109
110 async fn handle_inner(
111 &self,
112 query: PromQuery,
113 ctx: Arc<QueryContext>,
114 is_range_query: bool,
115 ) -> PrometheusJsonResponse {
116 let db = ctx.get_db_string();
117 let _timer = crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER
118 .with_label_values(&[db.as_str()])
119 .start_timer();
120
121 let result = self.handler.do_query(&query, ctx).await;
122 let (metric_name, mut result_type) =
123 match retrieve_metric_name_and_result_type(&query.query) {
124 Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
125 Err(err) => {
126 return PrometheusJsonResponse::error(err.status_code(), err.output_msg())
127 }
128 };
129 if is_range_query {
131 result_type = ValueType::Matrix;
132 };
133
134 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
135 }
136}