frontend/instance/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
23use client::OutputData;
24use common_catalog::format_full_table_name;
25use common_error::ext::BoxedError;
26use common_query::Output;
27use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
28use common_recordbatch::RecordBatches;
29use common_telemetry::{debug, tracing};
30use operator::insert::InserterRef;
31use operator::statement::StatementExecutor;
32use prost::Message;
33use servers::error::{self, AuthSnafu, Result as ServerResult};
34use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
37use servers::prom_store::{self, Metrics};
38use servers::query_handler::{
39    PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
40};
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt};
43use tracing::instrument;
44
45use crate::error::{
46    CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
47    TableNotFoundSnafu,
48};
49use crate::instance::Instance;
50
51const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
52
53#[inline]
54fn is_supported(response_type: i32) -> bool {
55    // Only supports samples response right now
56    response_type == SAMPLES_RESPONSE_TYPE
57}
58
59/// Negotiating the content type of the remote read response.
60///
61/// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
62/// implemented by server, error is returned.
63/// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
64fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
65    if accepted_response_types.is_empty() {
66        return Ok(ResponseType::Samples);
67    }
68
69    let response_type = accepted_response_types
70        .iter()
71        .find(|t| is_supported(**t))
72        .with_context(|| error::NotSupportedSnafu {
73            feat: format!(
74                "server does not support any of the requested response types: {accepted_response_types:?}",
75            ),
76        })?;
77
78    // It's safe to unwrap here, we known that it should be SAMPLES_RESPONSE_TYPE
79    Ok(ResponseType::try_from(*response_type).unwrap())
80}
81
82#[instrument(skip_all, fields(table_name))]
83async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
84    let OutputData::Stream(stream) = output.data else {
85        unreachable!()
86    };
87    let recordbatches = RecordBatches::try_collect(stream)
88        .await
89        .context(error::CollectRecordbatchSnafu)?;
90    Ok(QueryResult {
91        timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
92    })
93}
94
95impl Instance {
96    #[tracing::instrument(skip_all)]
97    async fn handle_remote_query(
98        &self,
99        ctx: &QueryContextRef,
100        catalog_name: &str,
101        schema_name: &str,
102        table_name: &str,
103        query: &Query,
104    ) -> Result<Output> {
105        let table = self
106            .catalog_manager
107            .table(catalog_name, schema_name, table_name, Some(ctx))
108            .await
109            .context(CatalogSnafu)?
110            .with_context(|| TableNotFoundSnafu {
111                table_name: format_full_table_name(catalog_name, schema_name, table_name),
112            })?;
113
114        let dataframe = self
115            .query_engine
116            .read_table(table)
117            .with_context(|_| ReadTableSnafu {
118                table_name: format_full_table_name(catalog_name, schema_name, table_name),
119            })?;
120
121        let logical_plan =
122            prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
123
124        debug!(
125            "Prometheus remote read, table: {}, logical plan: {}",
126            table_name,
127            logical_plan.display_indent(),
128        );
129
130        self.query_engine
131            .execute(logical_plan, ctx.clone())
132            .await
133            .context(ExecLogicalPlanSnafu)
134    }
135
136    #[tracing::instrument(skip_all)]
137    async fn handle_remote_queries(
138        &self,
139        ctx: QueryContextRef,
140        queries: &[Query],
141    ) -> ServerResult<Vec<(String, Output)>> {
142        let mut results = Vec::with_capacity(queries.len());
143
144        let catalog_name = ctx.current_catalog();
145        let schema_name = ctx.current_schema();
146
147        for query in queries {
148            let table_name = prom_store::table_name(query)?;
149
150            let output = self
151                .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
152                .await
153                .map_err(BoxedError::new)
154                .context(error::ExecuteQuerySnafu)?;
155
156            results.push((table_name, output));
157        }
158        Ok(results)
159    }
160}
161
162#[async_trait]
163impl PromStoreProtocolHandler for Instance {
164    async fn write(
165        &self,
166        request: RowInsertRequests,
167        ctx: QueryContextRef,
168        with_metric_engine: bool,
169    ) -> ServerResult<Output> {
170        self.plugins
171            .get::<PermissionCheckerRef>()
172            .as_ref()
173            .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
174            .context(AuthSnafu)?;
175        let interceptor_ref = self
176            .plugins
177            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
178        interceptor_ref.pre_write(&request, ctx.clone())?;
179
180        let output = if with_metric_engine {
181            let physical_table = ctx
182                .extension(PHYSICAL_TABLE_PARAM)
183                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
184                .to_string();
185            self.handle_metric_row_inserts(request, ctx.clone(), physical_table.clone())
186                .await
187                .map_err(BoxedError::new)
188                .context(error::ExecuteGrpcQuerySnafu)?
189        } else {
190            self.handle_row_inserts(request, ctx.clone(), true, true)
191                .await
192                .map_err(BoxedError::new)
193                .context(error::ExecuteGrpcQuerySnafu)?
194        };
195
196        Ok(output)
197    }
198
199    #[instrument(skip_all, fields(table_name))]
200    async fn read(
201        &self,
202        request: ReadRequest,
203        ctx: QueryContextRef,
204    ) -> ServerResult<PromStoreResponse> {
205        self.plugins
206            .get::<PermissionCheckerRef>()
207            .as_ref()
208            .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
209            .context(AuthSnafu)?;
210        let interceptor_ref = self
211            .plugins
212            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
213        interceptor_ref.pre_read(&request, ctx.clone())?;
214
215        let response_type = negotiate_response_type(&request.accepted_response_types)?;
216
217        // TODO(dennis): use read_hints to speedup query if possible
218        let results = self.handle_remote_queries(ctx, &request.queries).await?;
219
220        match response_type {
221            ResponseType::Samples => {
222                let mut query_results = Vec::with_capacity(results.len());
223                let mut map = HashMap::new();
224                for (table_name, output) in results {
225                    let plan = output.meta.plan.clone();
226                    query_results.push(to_query_result(&table_name, output).await?);
227                    if let Some(ref plan) = plan {
228                        collect_plan_metrics(plan, &mut [&mut map]);
229                    }
230                }
231
232                let response = ReadResponse {
233                    results: query_results,
234                };
235
236                let resp_metrics = map
237                    .into_iter()
238                    .map(|(k, v)| (k, v.into()))
239                    .collect::<HashMap<_, _>>();
240
241                // TODO(dennis): may consume too much memory, adds flow control
242                Ok(PromStoreResponse {
243                    content_type: CONTENT_TYPE_PROTOBUF.clone(),
244                    content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
245                    resp_metrics,
246                    body: prom_store::snappy_compress(&response.encode_to_vec())?,
247                })
248            }
249            ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
250                feat: "streamed remote read",
251            }
252            .fail(),
253        }
254    }
255
256    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
257        todo!();
258    }
259}
260
261/// This handler is mainly used for `frontend` or `standalone` to directly import
262/// the metrics collected by itself, thereby avoiding importing metrics through the network,
263/// thus reducing compression and network transmission overhead,
264/// so only implement `PromStoreProtocolHandler::write` method.
265pub struct ExportMetricHandler {
266    inserter: InserterRef,
267    statement_executor: Arc<StatementExecutor>,
268}
269
270impl ExportMetricHandler {
271    pub fn new_handler(
272        inserter: InserterRef,
273        statement_executor: Arc<StatementExecutor>,
274    ) -> PromStoreProtocolHandlerRef {
275        Arc::new(Self {
276            inserter,
277            statement_executor,
278        })
279    }
280}
281
282#[async_trait]
283impl PromStoreProtocolHandler for ExportMetricHandler {
284    async fn write(
285        &self,
286        request: RowInsertRequests,
287        ctx: QueryContextRef,
288        _: bool,
289    ) -> ServerResult<Output> {
290        self.inserter
291            .handle_metric_row_inserts(
292                request,
293                ctx,
294                &self.statement_executor,
295                GREPTIME_PHYSICAL_TABLE.to_string(),
296            )
297            .await
298            .map_err(BoxedError::new)
299            .context(error::ExecuteGrpcQuerySnafu)
300    }
301
302    async fn read(
303        &self,
304        _request: ReadRequest,
305        _ctx: QueryContextRef,
306    ) -> ServerResult<PromStoreResponse> {
307        unreachable!();
308    }
309
310    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
311        unreachable!();
312    }
313}