servers/grpc/
prom_query_gateway.rs1use std::sync::Arc;
19
20use api::v1::prometheus_gateway_server::PrometheusGateway;
21use api::v1::promql_request::Promql;
22use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
23use async_trait::async_trait;
24use auth::UserProviderRef;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_time::util::current_time_rfc3339;
28use promql_parser::parser::value::ValueType;
29use query::parser::PromQuery;
30use session::context::{Channel, QueryContext};
31use snafu::OptionExt;
32use tonic::{Request, Response};
33
34use crate::error::InvalidQuerySnafu;
35use crate::grpc::TonicResult;
36use crate::grpc::context_auth::auth;
37use crate::grpc::greptime_handler::create_query_context;
38use crate::http::prometheus::{PrometheusJsonResponse, retrieve_metric_name_and_result_type};
39use crate::prometheus_handler::PrometheusHandlerRef;
40
41pub struct PrometheusGatewayService {
42 handler: PrometheusHandlerRef,
43 user_provider: Option<UserProviderRef>,
44}
45
46#[async_trait]
47impl PrometheusGateway for PrometheusGatewayService {
48 async fn handle(&self, req: Request<PromqlRequest>) -> TonicResult<Response<PromqlResponse>> {
49 let mut is_range_query = false;
50 let inner = req.into_inner();
51 let prom_query = match inner.promql.context(InvalidQuerySnafu {
52 reason: "Expecting non-empty PromqlRequest.",
53 })? {
54 Promql::RangeQuery(range_query) => {
55 is_range_query = true;
56 PromQuery {
57 query: range_query.query,
58 start: range_query.start,
59 end: range_query.end,
60 step: range_query.step,
61 lookback: range_query.lookback,
62 alias: None,
63 }
64 }
65 Promql::InstantQuery(instant_query) => {
66 let time = if instant_query.time.is_empty() {
67 current_time_rfc3339()
68 } else {
69 instant_query.time
70 };
71 PromQuery {
72 query: instant_query.query,
73 start: time.clone(),
74 end: time,
75 step: String::from("1s"),
76 lookback: instant_query.lookback,
77 alias: None,
78 }
79 }
80 };
81
82 let header = inner.header.as_ref();
83 let query_ctx = create_query_context(
84 Channel::Promql,
85 header,
86 Default::default(),
87 Default::default(),
88 )?;
89
90 let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
91 query_ctx.set_current_user(user_info);
92
93 let json_response = self
94 .handle_inner(prom_query, query_ctx, is_range_query)
95 .await;
96 let json_bytes = serde_json::to_string(&json_response).unwrap().into_bytes();
97
98 let response = Response::new(PromqlResponse {
99 header: Some(ResponseHeader {
100 status: Some(api::v1::Status {
101 status_code: StatusCode::Success as _,
102 ..Default::default()
103 }),
104 }),
105 body: json_bytes,
106 });
107 Ok(response)
108 }
109}
110
111impl PrometheusGatewayService {
112 pub fn new(handler: PrometheusHandlerRef, user_provider: Option<UserProviderRef>) -> Self {
113 Self {
114 handler,
115 user_provider,
116 }
117 }
118
119 async fn handle_inner(
120 &self,
121 query: PromQuery,
122 ctx: Arc<QueryContext>,
123 is_range_query: bool,
124 ) -> PrometheusJsonResponse {
125 let db = ctx.get_db_string();
126 let _timer = crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER
127 .with_label_values(&[db.as_str()])
128 .start_timer();
129
130 let result = self.handler.do_query(&query, ctx).await;
131 let (metric_name, mut result_type) =
132 match retrieve_metric_name_and_result_type(&query.query) {
133 Ok((metric_name, result_type)) => (metric_name, result_type),
134 Err(err) => {
135 return PrometheusJsonResponse::error(err.status_code(), err.output_msg());
136 }
137 };
138 if is_range_query {
140 result_type = ValueType::Matrix;
141 };
142
143 PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
144 }
145}