1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
23use client::OutputData;
24use common_catalog::format_full_table_name;
25use common_error::ext::BoxedError;
26use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
27use common_query::Output;
28use common_recordbatch::RecordBatches;
29use common_telemetry::{debug, tracing};
30use operator::insert::InserterRef;
31use operator::statement::StatementExecutor;
32use prost::Message;
33use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
34use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
37use servers::prom_store::{self, Metrics};
38use servers::query_handler::{
39 PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
40};
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt};
43
44use crate::error::{
45 CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
46 TableNotFoundSnafu,
47};
48use crate::instance::Instance;
49
50const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
51
52#[inline]
53fn is_supported(response_type: i32) -> bool {
54 response_type == SAMPLES_RESPONSE_TYPE
56}
57
58fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
64 if accepted_response_types.is_empty() {
65 return Ok(ResponseType::Samples);
66 }
67
68 let response_type = accepted_response_types
69 .iter()
70 .find(|t| is_supported(**t))
71 .with_context(|| error::NotSupportedSnafu {
72 feat: format!(
73 "server does not support any of the requested response types: {accepted_response_types:?}",
74 ),
75 })?;
76
77 Ok(ResponseType::try_from(*response_type).unwrap())
79}
80
81async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
82 let OutputData::Stream(stream) = output.data else {
83 unreachable!()
84 };
85 let recordbatches = RecordBatches::try_collect(stream)
86 .await
87 .context(error::CollectRecordbatchSnafu)?;
88 Ok(QueryResult {
89 timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
90 })
91}
92
93impl Instance {
94 #[tracing::instrument(skip_all)]
95 async fn handle_remote_query(
96 &self,
97 ctx: &QueryContextRef,
98 catalog_name: &str,
99 schema_name: &str,
100 table_name: &str,
101 query: &Query,
102 ) -> Result<Output> {
103 let table = self
104 .catalog_manager
105 .table(catalog_name, schema_name, table_name, Some(ctx))
106 .await
107 .context(CatalogSnafu)?
108 .with_context(|| TableNotFoundSnafu {
109 table_name: format_full_table_name(catalog_name, schema_name, table_name),
110 })?;
111
112 let dataframe = self
113 .query_engine
114 .read_table(table)
115 .with_context(|_| ReadTableSnafu {
116 table_name: format_full_table_name(catalog_name, schema_name, table_name),
117 })?;
118
119 let logical_plan =
120 prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
121
122 debug!(
123 "Prometheus remote read, table: {}, logical plan: {}",
124 table_name,
125 logical_plan.display_indent(),
126 );
127
128 self.query_engine
129 .execute(logical_plan, ctx.clone())
130 .await
131 .context(ExecLogicalPlanSnafu)
132 }
133
134 #[tracing::instrument(skip_all)]
135 async fn handle_remote_queries(
136 &self,
137 ctx: QueryContextRef,
138 queries: &[Query],
139 ) -> ServerResult<Vec<(String, Output)>> {
140 let mut results = Vec::with_capacity(queries.len());
141
142 let catalog_name = ctx.current_catalog();
143 let schema_name = ctx.current_schema();
144
145 for query in queries {
146 let table_name = prom_store::table_name(query)?;
147
148 let output = self
149 .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
150 .await
151 .map_err(BoxedError::new)
152 .context(error::ExecuteQuerySnafu)?;
153
154 results.push((table_name, output));
155 }
156 Ok(results)
157 }
158}
159
160#[async_trait]
161impl PromStoreProtocolHandler for Instance {
162 async fn write(
163 &self,
164 request: RowInsertRequests,
165 ctx: QueryContextRef,
166 with_metric_engine: bool,
167 ) -> ServerResult<Output> {
168 self.plugins
169 .get::<PermissionCheckerRef>()
170 .as_ref()
171 .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
172 .context(AuthSnafu)?;
173 let interceptor_ref = self
174 .plugins
175 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
176 interceptor_ref.pre_write(&request, ctx.clone())?;
177
178 let _guard = if let Some(limiter) = &self.limiter {
179 let result = limiter.limit_row_inserts(&request);
180 if result.is_none() {
181 return InFlightWriteBytesExceededSnafu.fail();
182 }
183 result
184 } else {
185 None
186 };
187
188 let output = if with_metric_engine {
189 let physical_table = ctx
190 .extension(PHYSICAL_TABLE_PARAM)
191 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
192 .to_string();
193 self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
194 .await
195 .map_err(BoxedError::new)
196 .context(error::ExecuteGrpcQuerySnafu)?
197 } else {
198 self.handle_row_inserts(request, ctx.clone())
199 .await
200 .map_err(BoxedError::new)
201 .context(error::ExecuteGrpcQuerySnafu)?
202 };
203
204 Ok(output)
205 }
206
207 async fn read(
208 &self,
209 request: ReadRequest,
210 ctx: QueryContextRef,
211 ) -> ServerResult<PromStoreResponse> {
212 self.plugins
213 .get::<PermissionCheckerRef>()
214 .as_ref()
215 .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
216 .context(AuthSnafu)?;
217 let interceptor_ref = self
218 .plugins
219 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
220 interceptor_ref.pre_read(&request, ctx.clone())?;
221
222 let response_type = negotiate_response_type(&request.accepted_response_types)?;
223
224 let results = self.handle_remote_queries(ctx, &request.queries).await?;
226
227 match response_type {
228 ResponseType::Samples => {
229 let mut query_results = Vec::with_capacity(results.len());
230 let mut map = HashMap::new();
231 for (table_name, output) in results {
232 let plan = output.meta.plan.clone();
233 query_results.push(to_query_result(&table_name, output).await?);
234 if let Some(ref plan) = plan {
235 collect_plan_metrics(plan, &mut [&mut map]);
236 }
237 }
238
239 let response = ReadResponse {
240 results: query_results,
241 };
242
243 let resp_metrics = map
244 .into_iter()
245 .map(|(k, v)| (k, v.into()))
246 .collect::<HashMap<_, _>>();
247
248 Ok(PromStoreResponse {
250 content_type: CONTENT_TYPE_PROTOBUF.clone(),
251 content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
252 resp_metrics,
253 body: prom_store::snappy_compress(&response.encode_to_vec())?,
254 })
255 }
256 ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
257 feat: "streamed remote read",
258 }
259 .fail(),
260 }
261 }
262
263 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
264 todo!();
265 }
266}
267
268pub struct ExportMetricHandler {
273 inserter: InserterRef,
274 statement_executor: Arc<StatementExecutor>,
275}
276
277impl ExportMetricHandler {
278 pub fn new_handler(
279 inserter: InserterRef,
280 statement_executor: Arc<StatementExecutor>,
281 ) -> PromStoreProtocolHandlerRef {
282 Arc::new(Self {
283 inserter,
284 statement_executor,
285 })
286 }
287}
288
289#[async_trait]
290impl PromStoreProtocolHandler for ExportMetricHandler {
291 async fn write(
292 &self,
293 request: RowInsertRequests,
294 ctx: QueryContextRef,
295 _: bool,
296 ) -> ServerResult<Output> {
297 self.inserter
298 .handle_metric_row_inserts(
299 request,
300 ctx,
301 &self.statement_executor,
302 GREPTIME_PHYSICAL_TABLE.to_string(),
303 )
304 .await
305 .map_err(BoxedError::new)
306 .context(error::ExecuteGrpcQuerySnafu)
307 }
308
309 async fn read(
310 &self,
311 _request: ReadRequest,
312 _ctx: QueryContextRef,
313 ) -> ServerResult<PromStoreResponse> {
314 unreachable!();
315 }
316
317 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
318 unreachable!();
319 }
320}