Skip to main content

frontend/instance/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::alter_table_expr::Kind;
21use api::v1::{
22    AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef, CreateTableExpr,
23    RowInsertRequests, SemanticType,
24};
25use async_trait::async_trait;
26use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
27use client::OutputData;
28use common_catalog::format_full_table_name;
29use common_error::ext::BoxedError;
30use common_query::Output;
31use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
32use common_recordbatch::RecordBatches;
33use common_telemetry::{debug, tracing};
34use operator::insert::{
35    AutoCreateTableType, InserterRef, build_create_table_expr, fill_table_options_for_create,
36};
37use operator::statement::StatementExecutor;
38use prost::Message;
39use servers::error::{self, AuthSnafu, Result as ServerResult};
40use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
41use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
42use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
43use servers::pending_rows_batcher::PendingRowsSchemaAlterer;
44use servers::prom_store::{self, Metrics};
45use servers::query_handler::{
46    PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
47};
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
51use store_api::mito_engine_options::SST_FORMAT_KEY;
52use table::table_reference::TableReference;
53use tracing::instrument;
54
55use crate::error::{
56    CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
57    TableNotFoundSnafu,
58};
59use crate::instance::Instance;
60
61const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
62
63fn auto_create_table_type_for_prom_remote_write(
64    ctx: &QueryContextRef,
65    with_metric_engine: bool,
66) -> AutoCreateTableType {
67    if with_metric_engine {
68        let physical_table = ctx
69            .extension(PHYSICAL_TABLE_PARAM)
70            .unwrap_or(GREPTIME_PHYSICAL_TABLE)
71            .to_string();
72        AutoCreateTableType::Logical(physical_table)
73    } else {
74        AutoCreateTableType::Physical
75    }
76}
77
78fn required_physical_table_for_create_type(create_type: &AutoCreateTableType) -> Option<&str> {
79    match create_type {
80        AutoCreateTableType::Logical(physical_table) => Some(physical_table.as_str()),
81        _ => None,
82    }
83}
84
85fn fill_metric_physical_table_options(table_options: &mut HashMap<String, String>) {
86    // We always enforce flat format in this ingestion path.
87    table_options.insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
88    table_options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
89}
90
91#[inline]
92fn is_supported(response_type: i32) -> bool {
93    // Only supports samples response right now
94    response_type == SAMPLES_RESPONSE_TYPE
95}
96
97/// Negotiating the content type of the remote read response.
98///
99/// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
100/// implemented by server, error is returned.
101/// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
102fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
103    if accepted_response_types.is_empty() {
104        return Ok(ResponseType::Samples);
105    }
106
107    let response_type = accepted_response_types
108        .iter()
109        .find(|t| is_supported(**t))
110        .with_context(|| error::NotSupportedSnafu {
111            feat: format!(
112                "server does not support any of the requested response types: {accepted_response_types:?}",
113            ),
114        })?;
115
116    // It's safe to unwrap here, we known that it should be SAMPLES_RESPONSE_TYPE
117    Ok(ResponseType::try_from(*response_type).unwrap())
118}
119
120#[instrument(skip_all, fields(table_name))]
121async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
122    let OutputData::Stream(stream) = output.data else {
123        unreachable!()
124    };
125    let recordbatches = RecordBatches::try_collect(stream)
126        .await
127        .context(error::CollectRecordbatchSnafu)?;
128    Ok(QueryResult {
129        timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
130    })
131}
132
133impl Instance {
134    #[tracing::instrument(skip_all)]
135    async fn handle_remote_query(
136        &self,
137        ctx: &QueryContextRef,
138        catalog_name: &str,
139        schema_name: &str,
140        table_name: &str,
141        query: &Query,
142    ) -> Result<Output> {
143        let table = self
144            .catalog_manager
145            .table(catalog_name, schema_name, table_name, Some(ctx))
146            .await
147            .context(CatalogSnafu)?
148            .with_context(|| TableNotFoundSnafu {
149                table_name: format_full_table_name(catalog_name, schema_name, table_name),
150            })?;
151
152        let dataframe = self
153            .query_engine
154            .read_table(table)
155            .with_context(|_| ReadTableSnafu {
156                table_name: format_full_table_name(catalog_name, schema_name, table_name),
157            })?;
158
159        let logical_plan =
160            prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
161
162        debug!(
163            "Prometheus remote read, table: {}, logical plan: {}",
164            table_name,
165            logical_plan.display_indent(),
166        );
167
168        self.query_engine
169            .execute(logical_plan, ctx.clone())
170            .await
171            .context(ExecLogicalPlanSnafu)
172    }
173
174    #[tracing::instrument(skip_all)]
175    async fn handle_remote_queries(
176        &self,
177        ctx: QueryContextRef,
178        queries: &[Query],
179    ) -> ServerResult<Vec<(String, Output)>> {
180        let mut results = Vec::with_capacity(queries.len());
181
182        let catalog_name = ctx.current_catalog();
183        let schema_name = ctx.current_schema();
184
185        for query in queries {
186            let table_name = prom_store::table_name(query)?;
187
188            let output = self
189                .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
190                .await
191                .map_err(BoxedError::new)
192                .context(error::ExecuteQuerySnafu)?;
193
194            results.push((table_name, output));
195        }
196        Ok(results)
197    }
198}
199
200#[async_trait]
201impl PendingRowsSchemaAlterer for Instance {
202    async fn create_tables_if_missing_batch(
203        &self,
204        catalog: &str,
205        schema: &str,
206        tables: &[(&str, &[api::v1::ColumnSchema])],
207        with_metric_engine: bool,
208        ctx: QueryContextRef,
209    ) -> ServerResult<()> {
210        if tables.is_empty() {
211            return Ok(());
212        }
213
214        let create_type = auto_create_table_type_for_prom_remote_write(&ctx, with_metric_engine);
215        if let Some(physical_table) = required_physical_table_for_create_type(&create_type) {
216            self.create_metric_physical_table_if_missing(
217                catalog,
218                schema,
219                physical_table,
220                ctx.clone(),
221            )
222            .await?;
223        }
224
225        let engine = if matches!(create_type, AutoCreateTableType::Logical(_)) {
226            METRIC_ENGINE_NAME
227        } else {
228            common_catalog::consts::default_engine()
229        };
230
231        // Check which tables actually still need to be created (may have been
232        // concurrently created by another request).
233        let mut create_exprs: Vec<CreateTableExpr> = Vec::with_capacity(tables.len());
234        for &(table_name, request_schema) in tables {
235            let existing = self
236                .catalog_manager()
237                .table(catalog, schema, table_name, Some(ctx.as_ref()))
238                .await
239                .map_err(BoxedError::new)
240                .context(error::ExecuteGrpcQuerySnafu)?;
241            if existing.is_some() {
242                continue;
243            }
244
245            let table_ref = TableReference::full(catalog, schema, table_name);
246            let mut create_table_expr = build_create_table_expr(&table_ref, request_schema, engine)
247                .map_err(BoxedError::new)
248                .context(error::ExecuteGrpcQuerySnafu)?;
249
250            let mut table_options = std::collections::HashMap::with_capacity(4);
251            fill_table_options_for_create(&mut table_options, &create_type, &ctx);
252            create_table_expr.table_options.extend(table_options);
253            create_exprs.push(create_table_expr);
254        }
255
256        if create_exprs.is_empty() {
257            return Ok(());
258        }
259
260        match create_type {
261            AutoCreateTableType::Logical(_) => {
262                // Use the batch API for logical tables.
263                self.statement_executor
264                    .create_logical_tables(&create_exprs, ctx)
265                    .await
266                    .map_err(BoxedError::new)
267                    .context(error::ExecuteGrpcQuerySnafu)?;
268            }
269            AutoCreateTableType::Physical => {
270                // Physical tables don't have a batch DDL path; create one at a time.
271                for mut expr in create_exprs {
272                    expr.table_options
273                        .insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
274                    self.statement_executor
275                        .create_table_inner(&mut expr, None, ctx.clone())
276                        .await
277                        .map_err(BoxedError::new)
278                        .context(error::ExecuteGrpcQuerySnafu)?;
279                }
280            }
281            create_type => {
282                return error::InvalidPromRemoteRequestSnafu {
283                    msg: format!(
284                        "prom remote write only supports logical or physical auto-create: {}",
285                        create_type.as_str()
286                    ),
287                }
288                .fail();
289            }
290        }
291
292        Ok(())
293    }
294
295    async fn add_missing_prom_tag_columns_batch(
296        &self,
297        catalog: &str,
298        schema: &str,
299        tables: &[(&str, &[String])],
300        ctx: QueryContextRef,
301    ) -> ServerResult<()> {
302        if tables.is_empty() {
303            return Ok(());
304        }
305
306        let alter_exprs: Vec<AlterTableExpr> = tables
307            .iter()
308            .filter(|(_, columns)| !columns.is_empty())
309            .map(|&(table_name, columns)| {
310                let add_columns = AddColumns {
311                    add_columns: columns
312                        .iter()
313                        .map(|column_name| AddColumn {
314                            column_def: Some(ColumnDef {
315                                name: column_name.clone(),
316                                data_type: ColumnDataType::String as i32,
317                                is_nullable: true,
318                                semantic_type: SemanticType::Tag as i32,
319                                comment: String::new(),
320                                ..Default::default()
321                            }),
322                            location: None,
323                            add_if_not_exists: true,
324                        })
325                        .collect(),
326                };
327
328                AlterTableExpr {
329                    catalog_name: catalog.to_string(),
330                    schema_name: schema.to_string(),
331                    table_name: table_name.to_string(),
332                    kind: Some(Kind::AddColumns(add_columns)),
333                }
334            })
335            .collect();
336
337        if alter_exprs.is_empty() {
338            return Ok(());
339        }
340
341        self.statement_executor
342            .alter_logical_tables(alter_exprs, ctx)
343            .await
344            .map_err(BoxedError::new)
345            .context(error::ExecuteGrpcQuerySnafu)?;
346
347        Ok(())
348    }
349}
350
351#[async_trait]
352impl PromStoreProtocolHandler for Instance {
353    async fn pre_write(
354        &self,
355        request: &RowInsertRequests,
356        ctx: QueryContextRef,
357    ) -> ServerResult<()> {
358        self.plugins
359            .get::<PermissionCheckerRef>()
360            .as_ref()
361            .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
362            .context(AuthSnafu)?;
363        let interceptor_ref = self
364            .plugins
365            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
366        interceptor_ref.pre_write(request, ctx)?;
367        Ok(())
368    }
369
370    async fn write(
371        &self,
372        request: RowInsertRequests,
373        ctx: QueryContextRef,
374        with_metric_engine: bool,
375    ) -> ServerResult<Output> {
376        self.pre_write(&request, ctx.clone()).await?;
377
378        let output = if with_metric_engine {
379            let physical_table = ctx
380                .extension(PHYSICAL_TABLE_PARAM)
381                .unwrap_or(GREPTIME_PHYSICAL_TABLE)
382                .to_string();
383            self.handle_metric_row_inserts(request, ctx.clone(), physical_table.clone())
384                .await
385                .map_err(BoxedError::new)
386                .context(error::ExecuteGrpcQuerySnafu)?
387        } else {
388            self.handle_row_inserts(request, ctx.clone(), true, true)
389                .await
390                .map_err(BoxedError::new)
391                .context(error::ExecuteGrpcQuerySnafu)?
392        };
393
394        Ok(output)
395    }
396
397    #[instrument(skip_all, fields(table_name))]
398    async fn read(
399        &self,
400        request: ReadRequest,
401        ctx: QueryContextRef,
402    ) -> ServerResult<PromStoreResponse> {
403        self.plugins
404            .get::<PermissionCheckerRef>()
405            .as_ref()
406            .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
407            .context(AuthSnafu)?;
408        let interceptor_ref = self
409            .plugins
410            .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
411        interceptor_ref.pre_read(&request, ctx.clone())?;
412
413        let response_type = negotiate_response_type(&request.accepted_response_types)?;
414
415        // TODO(dennis): use read_hints to speedup query if possible
416        let results = self.handle_remote_queries(ctx, &request.queries).await?;
417
418        match response_type {
419            ResponseType::Samples => {
420                let mut query_results = Vec::with_capacity(results.len());
421                let mut map = HashMap::new();
422                for (table_name, output) in results {
423                    let plan = output.meta.plan.clone();
424                    query_results.push(to_query_result(&table_name, output).await?);
425                    if let Some(ref plan) = plan {
426                        collect_plan_metrics(plan, &mut [&mut map]);
427                    }
428                }
429
430                let response = ReadResponse {
431                    results: query_results,
432                };
433
434                let resp_metrics = map
435                    .into_iter()
436                    .map(|(k, v)| (k, v.into()))
437                    .collect::<HashMap<_, _>>();
438
439                // TODO(dennis): may consume too much memory, adds flow control
440                Ok(PromStoreResponse {
441                    content_type: CONTENT_TYPE_PROTOBUF.clone(),
442                    content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
443                    resp_metrics,
444                    body: prom_store::snappy_compress(&response.encode_to_vec())?,
445                })
446            }
447            ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
448                feat: "streamed remote read",
449            }
450            .fail(),
451        }
452    }
453
454    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
455        todo!();
456    }
457}
458
459impl Instance {
460    async fn create_metric_physical_table_if_missing(
461        &self,
462        catalog: &str,
463        schema: &str,
464        physical_table: &str,
465        ctx: QueryContextRef,
466    ) -> ServerResult<()> {
467        let table = self
468            .catalog_manager()
469            .table(catalog, schema, physical_table, Some(ctx.as_ref()))
470            .await
471            .map_err(BoxedError::new)
472            .context(error::ExecuteGrpcQuerySnafu)?;
473        if table.is_some() {
474            return Ok(());
475        }
476
477        let table_ref = TableReference::full(catalog, schema, physical_table);
478        let default_schema = vec![
479            api::v1::ColumnSchema {
480                column_name: common_query::prelude::greptime_timestamp().to_string(),
481                datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
482                semantic_type: api::v1::SemanticType::Timestamp as i32,
483                datatype_extension: None,
484                options: None,
485            },
486            api::v1::ColumnSchema {
487                column_name: common_query::prelude::greptime_value().to_string(),
488                datatype: api::v1::ColumnDataType::Float64 as i32,
489                semantic_type: api::v1::SemanticType::Field as i32,
490                datatype_extension: None,
491                options: None,
492            },
493        ];
494        let mut create_table_expr = build_create_table_expr(
495            &table_ref,
496            &default_schema,
497            common_catalog::consts::default_engine(),
498        )
499        .map_err(BoxedError::new)
500        .context(error::ExecuteGrpcQuerySnafu)?;
501        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
502        fill_metric_physical_table_options(&mut create_table_expr.table_options);
503
504        self.statement_executor
505            .create_table_inner(&mut create_table_expr, None, ctx)
506            .await
507            .map_err(BoxedError::new)
508            .context(error::ExecuteGrpcQuerySnafu)?;
509
510        Ok(())
511    }
512}
513
514/// This handler is mainly used for `frontend` or `standalone` to directly import
515/// the metrics collected by itself, thereby avoiding importing metrics through the network,
516/// thus reducing compression and network transmission overhead,
517/// so only implement `PromStoreProtocolHandler::write` method.
518pub struct ExportMetricHandler {
519    inserter: InserterRef,
520    statement_executor: Arc<StatementExecutor>,
521}
522
523impl ExportMetricHandler {
524    pub fn new_handler(
525        inserter: InserterRef,
526        statement_executor: Arc<StatementExecutor>,
527    ) -> PromStoreProtocolHandlerRef {
528        Arc::new(Self {
529            inserter,
530            statement_executor,
531        })
532    }
533}
534
535#[async_trait]
536impl PromStoreProtocolHandler for ExportMetricHandler {
537    async fn write(
538        &self,
539        request: RowInsertRequests,
540        ctx: QueryContextRef,
541        _: bool,
542    ) -> ServerResult<Output> {
543        self.inserter
544            .handle_metric_row_inserts(
545                request,
546                ctx,
547                &self.statement_executor,
548                GREPTIME_PHYSICAL_TABLE.to_string(),
549            )
550            .await
551            .map_err(BoxedError::new)
552            .context(error::ExecuteGrpcQuerySnafu)
553    }
554
555    async fn read(
556        &self,
557        _request: ReadRequest,
558        _ctx: QueryContextRef,
559    ) -> ServerResult<PromStoreResponse> {
560        unreachable!();
561    }
562
563    async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
564        unreachable!();
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use std::sync::Arc;
571
572    use session::context::QueryContext;
573
574    use super::*;
575
576    #[test]
577    fn test_auto_create_table_type_for_prom_remote_write_metric_engine() {
578        let mut query_ctx = QueryContext::with(
579            common_catalog::consts::DEFAULT_CATALOG_NAME,
580            common_catalog::consts::DEFAULT_SCHEMA_NAME,
581        );
582        query_ctx.set_extension(PHYSICAL_TABLE_PARAM, "metric_physical".to_string());
583        let ctx = Arc::new(query_ctx);
584
585        let create_type = auto_create_table_type_for_prom_remote_write(&ctx, true);
586        match create_type {
587            AutoCreateTableType::Logical(physical) => assert_eq!(physical, "metric_physical"),
588            _ => panic!("expected logical table create type"),
589        }
590    }
591
592    #[test]
593    fn test_auto_create_table_type_for_prom_remote_write_without_metric_engine() {
594        let ctx = Arc::new(QueryContext::with(
595            common_catalog::consts::DEFAULT_CATALOG_NAME,
596            common_catalog::consts::DEFAULT_SCHEMA_NAME,
597        ));
598
599        let create_type = auto_create_table_type_for_prom_remote_write(&ctx, false);
600        match create_type {
601            AutoCreateTableType::Physical => {}
602            _ => panic!("expected physical table create type"),
603        }
604    }
605
606    #[test]
607    fn test_required_physical_table_for_create_type() {
608        let logical = AutoCreateTableType::Logical("phy_table".to_string());
609        assert_eq!(
610            Some("phy_table"),
611            required_physical_table_for_create_type(&logical)
612        );
613
614        let physical = AutoCreateTableType::Physical;
615        assert_eq!(None, required_physical_table_for_create_type(&physical));
616    }
617
618    #[test]
619    fn test_metric_physical_table_options_forces_flat_sst_format() {
620        let mut table_options = HashMap::new();
621
622        fill_metric_physical_table_options(&mut table_options);
623
624        assert_eq!(
625            Some("flat"),
626            table_options.get(SST_FORMAT_KEY).map(String::as_str)
627        );
628        assert_eq!(
629            Some("true"),
630            table_options
631                .get(PHYSICAL_TABLE_METADATA_KEY)
632                .map(String::as_str)
633        );
634    }
635}