frontend/instance/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
23use client::OutputData;
24use common_catalog::format_full_table_name;
25use common_error::ext::BoxedError;
26use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
27use common_query::Output;
28use common_recordbatch::RecordBatches;
29use common_telemetry::{debug, tracing};
30use operator::insert::InserterRef;
31use operator::statement::StatementExecutor;
32use prost::Message;
33use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
34use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
35use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
36use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
37use servers::prom_store::{self, Metrics};
38use servers::query_handler::{
39    PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
40};
41use session::context::QueryContextRef;
42use snafu::{OptionExt, ResultExt};
43
44use crate::error::{
45    CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
46    TableNotFoundSnafu,
47};
48use crate::instance::Instance;
49
50const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
51
52#[inline]
53fn is_supported(response_type: i32) -> bool {
54    // Only supports samples response right now
55    response_type == SAMPLES_RESPONSE_TYPE
56}
57
58/// Negotiating the content type of the remote read response.
59///
60/// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
61/// implemented by server, error is returned.
62/// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
63fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
64    if accepted_response_types.is_empty() {
65        return Ok(ResponseType::Samples);
66    }
67
68    let response_type = accepted_response_types
69        .iter()
70        .find(|t| is_supported(**t))
71        .with_context(|| error::NotSupportedSnafu {
72            feat: format!(
73                "server does not support any of the requested response types: {accepted_response_types:?}",
74            ),
75        })?;
76
77    // It's safe to unwrap here, we known that it should be SAMPLES_RESPONSE_TYPE
78    Ok(ResponseType::try_from(*response_type).unwrap())
79}
80
81async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
82    let OutputData::Stream(stream) = output.data else {
83        unreachable!()
84    };
85    let recordbatches = RecordBatches::try_collect(stream)
86        .await
87        .context(error::CollectRecordbatchSnafu)?;
88    Ok(QueryResult {
89        timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
90    })
91}
92
93impl Instance {
94    #[tracing::instrument(skip_all)]
95    async fn handle_remote_query(
96        &self,
97        ctx: &QueryContextRef,
98        catalog_name: &str,
99        schema_name: &str,
100        table_name: &str,
101        query: &Query,
102    ) -> Result<Output> {
103        let table = self
104            .catalog_manager
105            .table(catalog_name, schema_name, table_name, Some(ctx))
106            .await
107            .context(CatalogSnafu)?
108            .with_context(|| TableNotFoundSnafu {
109                table_name: format_full_table_name(catalog_name, schema_name, table_name),
110            })?;
111
112        let dataframe = self
113            .query_engine
114            .read_table(table)
115            .with_context(|_| ReadTableSnafu {
116                table_name: format_full_table_name(catalog_name, schema_name, table_name),
117            })?;
118
119        let logical_plan =
120            prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
121
122        debug!(
123            "Prometheus remote read, table: {}, logical plan: {}",
124            table_name,
125            logical_plan.display_indent(),
126        );
127
128        self.query_engine
129            .execute(logical_plan, ctx.clone())
130            .await
131            .context(ExecLogicalPlanSnafu)
132    }
133
134    #[tracing::instrument(skip_all)]
135    async fn handle_remote_queries(
136        &self,
137        ctx: QueryContextRef,
138        queries: &[Query],
139    ) -> ServerResult<Vec<(String, Output)>> {
140        let mut results = Vec::with_capacity(queries.len());
141
142        let catalog_name = ctx.current_catalog();
143        let schema_name = ctx.current_schema();
144
145        for query in queries {
146            let table_name = prom_store::table_name(query)?;
147
148            let output = self
149                .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
150                .await
151                .map_err(BoxedError::new)
152                .context(error::ExecuteQuerySnafu)?;
153
154            results.push((table_name, output));
155        }
156        Ok(results)
157    }
158}
159
160#[async_trait]
161impl PromStoreProtocolHandler for Instance {
162    async fn write(
163        &self,
164        request: RowInsertRequests,
165        ctx: QueryContextRef,
166        with_metric_engine: bool,
167    ) -> ServerResult<Output> {
168        self.plugins
169            .get::<PermissionCheckerRef>()
170            .as_ref()
171            .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
172            .context(AuthSnafu)?;
173        let interceptor_ref = self
174            .plugins
175            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
176        interceptor_ref.pre_write(&request, ctx.clone())?;
177
178        let _guard = if let Some(limiter) = &self.limiter {
179            let result = limiter.limit_row_inserts(&request);
180            if result.is_none() {
181                return InFlightWriteBytesExceededSnafu.fail();
182            }
183            result
184        } else {
185            None
186        };
187
188        let output = if with_metric_engine {
189            let physical_table = ctx
190                .extension(PHYSICAL_TABLE_PARAM)
191                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
192                .to_string();
193            self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
194                .await
195                .map_err(BoxedError::new)
196                .context(error::ExecuteGrpcQuerySnafu)?
197        } else {
198            self.handle_row_inserts(request, ctx.clone())
199                .await
200                .map_err(BoxedError::new)
201                .context(error::ExecuteGrpcQuerySnafu)?
202        };
203
204        Ok(output)
205    }
206
207    async fn read(
208        &self,
209        request: ReadRequest,
210        ctx: QueryContextRef,
211    ) -> ServerResult<PromStoreResponse> {
212        self.plugins
213            .get::<PermissionCheckerRef>()
214            .as_ref()
215            .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
216            .context(AuthSnafu)?;
217        let interceptor_ref = self
218            .plugins
219            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
220        interceptor_ref.pre_read(&request, ctx.clone())?;
221
222        let response_type = negotiate_response_type(&request.accepted_response_types)?;
223
224        // TODO(dennis): use read_hints to speedup query if possible
225        let results = self.handle_remote_queries(ctx, &request.queries).await?;
226
227        match response_type {
228            ResponseType::Samples => {
229                let mut query_results = Vec::with_capacity(results.len());
230                let mut map = HashMap::new();
231                for (table_name, output) in results {
232                    let plan = output.meta.plan.clone();
233                    query_results.push(to_query_result(&table_name, output).await?);
234                    if let Some(ref plan) = plan {
235                        collect_plan_metrics(plan, &mut [&mut map]);
236                    }
237                }
238
239                let response = ReadResponse {
240                    results: query_results,
241                };
242
243                let resp_metrics = map
244                    .into_iter()
245                    .map(|(k, v)| (k, v.into()))
246                    .collect::<HashMap<_, _>>();
247
248                // TODO(dennis): may consume too much memory, adds flow control
249                Ok(PromStoreResponse {
250                    content_type: CONTENT_TYPE_PROTOBUF.clone(),
251                    content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
252                    resp_metrics,
253                    body: prom_store::snappy_compress(&response.encode_to_vec())?,
254                })
255            }
256            ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
257                feat: "streamed remote read",
258            }
259            .fail(),
260        }
261    }
262
263    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
264        todo!();
265    }
266}
267
268/// This handler is mainly used for `frontend` or `standalone` to directly import
269/// the metrics collected by itself, thereby avoiding importing metrics through the network,
270/// thus reducing compression and network transmission overhead,
271/// so only implement `PromStoreProtocolHandler::write` method.
272pub struct ExportMetricHandler {
273    inserter: InserterRef,
274    statement_executor: Arc<StatementExecutor>,
275}
276
277impl ExportMetricHandler {
278    pub fn new_handler(
279        inserter: InserterRef,
280        statement_executor: Arc<StatementExecutor>,
281    ) -> PromStoreProtocolHandlerRef {
282        Arc::new(Self {
283            inserter,
284            statement_executor,
285        })
286    }
287}
288
289#[async_trait]
290impl PromStoreProtocolHandler for ExportMetricHandler {
291    async fn write(
292        &self,
293        request: RowInsertRequests,
294        ctx: QueryContextRef,
295        _: bool,
296    ) -> ServerResult<Output> {
297        self.inserter
298            .handle_metric_row_inserts(
299                request,
300                ctx,
301                &self.statement_executor,
302                GREPTIME_PHYSICAL_TABLE.to_string(),
303            )
304            .await
305            .map_err(BoxedError::new)
306            .context(error::ExecuteGrpcQuerySnafu)
307    }
308
309    async fn read(
310        &self,
311        _request: ReadRequest,
312        _ctx: QueryContextRef,
313    ) -> ServerResult<PromStoreResponse> {
314        unreachable!();
315    }
316
317    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
318        unreachable!();
319    }
320}