1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
23use client::OutputData;
24use common_catalog::format_full_table_name;
25use common_error::ext::BoxedError;
26use common_query::Output;
27use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
28use common_recordbatch::RecordBatches;
29use common_telemetry::{debug, tracing};
30use operator::insert::InserterRef;
31use operator::statement::StatementExecutor;
32use prost::Message;
33use servers::error::{self, AuthSnafu, Result as ServerResult};
34use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
37use servers::prom_store::{self, Metrics};
38use servers::query_handler::{
39 PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
40};
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt};
43use tracing::instrument;
44
45use crate::error::{
46 CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
47 TableNotFoundSnafu,
48};
49use crate::instance::Instance;
50
51const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
52
53#[inline]
54fn is_supported(response_type: i32) -> bool {
55 response_type == SAMPLES_RESPONSE_TYPE
57}
58
59fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
65 if accepted_response_types.is_empty() {
66 return Ok(ResponseType::Samples);
67 }
68
69 let response_type = accepted_response_types
70 .iter()
71 .find(|t| is_supported(**t))
72 .with_context(|| error::NotSupportedSnafu {
73 feat: format!(
74 "server does not support any of the requested response types: {accepted_response_types:?}",
75 ),
76 })?;
77
78 Ok(ResponseType::try_from(*response_type).unwrap())
80}
81
82#[instrument(skip_all, fields(table_name))]
83async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
84 let OutputData::Stream(stream) = output.data else {
85 unreachable!()
86 };
87 let recordbatches = RecordBatches::try_collect(stream)
88 .await
89 .context(error::CollectRecordbatchSnafu)?;
90 Ok(QueryResult {
91 timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
92 })
93}
94
95impl Instance {
96 #[tracing::instrument(skip_all)]
97 async fn handle_remote_query(
98 &self,
99 ctx: &QueryContextRef,
100 catalog_name: &str,
101 schema_name: &str,
102 table_name: &str,
103 query: &Query,
104 ) -> Result<Output> {
105 let table = self
106 .catalog_manager
107 .table(catalog_name, schema_name, table_name, Some(ctx))
108 .await
109 .context(CatalogSnafu)?
110 .with_context(|| TableNotFoundSnafu {
111 table_name: format_full_table_name(catalog_name, schema_name, table_name),
112 })?;
113
114 let dataframe = self
115 .query_engine
116 .read_table(table)
117 .with_context(|_| ReadTableSnafu {
118 table_name: format_full_table_name(catalog_name, schema_name, table_name),
119 })?;
120
121 let logical_plan =
122 prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
123
124 debug!(
125 "Prometheus remote read, table: {}, logical plan: {}",
126 table_name,
127 logical_plan.display_indent(),
128 );
129
130 self.query_engine
131 .execute(logical_plan, ctx.clone())
132 .await
133 .context(ExecLogicalPlanSnafu)
134 }
135
136 #[tracing::instrument(skip_all)]
137 async fn handle_remote_queries(
138 &self,
139 ctx: QueryContextRef,
140 queries: &[Query],
141 ) -> ServerResult<Vec<(String, Output)>> {
142 let mut results = Vec::with_capacity(queries.len());
143
144 let catalog_name = ctx.current_catalog();
145 let schema_name = ctx.current_schema();
146
147 for query in queries {
148 let table_name = prom_store::table_name(query)?;
149
150 let output = self
151 .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
152 .await
153 .map_err(BoxedError::new)
154 .context(error::ExecuteQuerySnafu)?;
155
156 results.push((table_name, output));
157 }
158 Ok(results)
159 }
160}
161
162#[async_trait]
163impl PromStoreProtocolHandler for Instance {
164 async fn write(
165 &self,
166 request: RowInsertRequests,
167 ctx: QueryContextRef,
168 with_metric_engine: bool,
169 ) -> ServerResult<Output> {
170 self.plugins
171 .get::<PermissionCheckerRef>()
172 .as_ref()
173 .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
174 .context(AuthSnafu)?;
175 let interceptor_ref = self
176 .plugins
177 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
178 interceptor_ref.pre_write(&request, ctx.clone())?;
179
180 let output = if with_metric_engine {
181 let physical_table = ctx
182 .extension(PHYSICAL_TABLE_PARAM)
183 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
184 .to_string();
185 self.handle_metric_row_inserts(request, ctx.clone(), physical_table.clone())
186 .await
187 .map_err(BoxedError::new)
188 .context(error::ExecuteGrpcQuerySnafu)?
189 } else {
190 self.handle_row_inserts(request, ctx.clone(), true, true)
191 .await
192 .map_err(BoxedError::new)
193 .context(error::ExecuteGrpcQuerySnafu)?
194 };
195
196 Ok(output)
197 }
198
199 #[instrument(skip_all, fields(table_name))]
200 async fn read(
201 &self,
202 request: ReadRequest,
203 ctx: QueryContextRef,
204 ) -> ServerResult<PromStoreResponse> {
205 self.plugins
206 .get::<PermissionCheckerRef>()
207 .as_ref()
208 .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
209 .context(AuthSnafu)?;
210 let interceptor_ref = self
211 .plugins
212 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
213 interceptor_ref.pre_read(&request, ctx.clone())?;
214
215 let response_type = negotiate_response_type(&request.accepted_response_types)?;
216
217 let results = self.handle_remote_queries(ctx, &request.queries).await?;
219
220 match response_type {
221 ResponseType::Samples => {
222 let mut query_results = Vec::with_capacity(results.len());
223 let mut map = HashMap::new();
224 for (table_name, output) in results {
225 let plan = output.meta.plan.clone();
226 query_results.push(to_query_result(&table_name, output).await?);
227 if let Some(ref plan) = plan {
228 collect_plan_metrics(plan, &mut [&mut map]);
229 }
230 }
231
232 let response = ReadResponse {
233 results: query_results,
234 };
235
236 let resp_metrics = map
237 .into_iter()
238 .map(|(k, v)| (k, v.into()))
239 .collect::<HashMap<_, _>>();
240
241 Ok(PromStoreResponse {
243 content_type: CONTENT_TYPE_PROTOBUF.clone(),
244 content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
245 resp_metrics,
246 body: prom_store::snappy_compress(&response.encode_to_vec())?,
247 })
248 }
249 ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
250 feat: "streamed remote read",
251 }
252 .fail(),
253 }
254 }
255
256 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
257 todo!();
258 }
259}
260
261pub struct ExportMetricHandler {
266 inserter: InserterRef,
267 statement_executor: Arc<StatementExecutor>,
268}
269
270impl ExportMetricHandler {
271 pub fn new_handler(
272 inserter: InserterRef,
273 statement_executor: Arc<StatementExecutor>,
274 ) -> PromStoreProtocolHandlerRef {
275 Arc::new(Self {
276 inserter,
277 statement_executor,
278 })
279 }
280}
281
282#[async_trait]
283impl PromStoreProtocolHandler for ExportMetricHandler {
284 async fn write(
285 &self,
286 request: RowInsertRequests,
287 ctx: QueryContextRef,
288 _: bool,
289 ) -> ServerResult<Output> {
290 self.inserter
291 .handle_metric_row_inserts(
292 request,
293 ctx,
294 &self.statement_executor,
295 GREPTIME_PHYSICAL_TABLE.to_string(),
296 )
297 .await
298 .map_err(BoxedError::new)
299 .context(error::ExecuteGrpcQuerySnafu)
300 }
301
302 async fn read(
303 &self,
304 _request: ReadRequest,
305 _ctx: QueryContextRef,
306 ) -> ServerResult<PromStoreResponse> {
307 unreachable!();
308 }
309
310 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
311 unreachable!();
312 }
313}