1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::prom_store::remote::read_request::ResponseType;
19use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
20use api::v1::alter_table_expr::Kind;
21use api::v1::{
22 AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef, CreateTableExpr,
23 RowInsertRequests, SemanticType,
24};
25use async_trait::async_trait;
26use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
27use client::OutputData;
28use common_catalog::format_full_table_name;
29use common_error::ext::BoxedError;
30use common_query::Output;
31use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
32use common_recordbatch::RecordBatches;
33use common_telemetry::{debug, tracing};
34use operator::insert::{
35 AutoCreateTableType, InserterRef, build_create_table_expr, fill_table_options_for_create,
36};
37use operator::statement::StatementExecutor;
38use prost::Message;
39use servers::error::{self, AuthSnafu, Result as ServerResult};
40use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
41use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
42use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
43use servers::pending_rows_batcher::PendingRowsSchemaAlterer;
44use servers::prom_store::{self, Metrics};
45use servers::query_handler::{
46 PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
47};
48use session::context::QueryContextRef;
49use snafu::{OptionExt, ResultExt};
50use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
51use store_api::mito_engine_options::SST_FORMAT_KEY;
52use table::table_reference::TableReference;
53use tracing::instrument;
54
55use crate::error::{
56 CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
57 TableNotFoundSnafu,
58};
59use crate::instance::Instance;
60
61const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
62
63fn auto_create_table_type_for_prom_remote_write(
64 ctx: &QueryContextRef,
65 with_metric_engine: bool,
66) -> AutoCreateTableType {
67 if with_metric_engine {
68 let physical_table = ctx
69 .extension(PHYSICAL_TABLE_PARAM)
70 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
71 .to_string();
72 AutoCreateTableType::Logical(physical_table)
73 } else {
74 AutoCreateTableType::Physical
75 }
76}
77
78fn required_physical_table_for_create_type(create_type: &AutoCreateTableType) -> Option<&str> {
79 match create_type {
80 AutoCreateTableType::Logical(physical_table) => Some(physical_table.as_str()),
81 _ => None,
82 }
83}
84
85fn fill_metric_physical_table_options(table_options: &mut HashMap<String, String>) {
86 table_options.insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
88 table_options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
89}
90
91#[inline]
92fn is_supported(response_type: i32) -> bool {
93 response_type == SAMPLES_RESPONSE_TYPE
95}
96
97fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
103 if accepted_response_types.is_empty() {
104 return Ok(ResponseType::Samples);
105 }
106
107 let response_type = accepted_response_types
108 .iter()
109 .find(|t| is_supported(**t))
110 .with_context(|| error::NotSupportedSnafu {
111 feat: format!(
112 "server does not support any of the requested response types: {accepted_response_types:?}",
113 ),
114 })?;
115
116 Ok(ResponseType::try_from(*response_type).unwrap())
118}
119
120#[instrument(skip_all, fields(table_name))]
121async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
122 let OutputData::Stream(stream) = output.data else {
123 unreachable!()
124 };
125 let recordbatches = RecordBatches::try_collect(stream)
126 .await
127 .context(error::CollectRecordbatchSnafu)?;
128 Ok(QueryResult {
129 timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
130 })
131}
132
133impl Instance {
134 #[tracing::instrument(skip_all)]
135 async fn handle_remote_query(
136 &self,
137 ctx: &QueryContextRef,
138 catalog_name: &str,
139 schema_name: &str,
140 table_name: &str,
141 query: &Query,
142 ) -> Result<Output> {
143 let table = self
144 .catalog_manager
145 .table(catalog_name, schema_name, table_name, Some(ctx))
146 .await
147 .context(CatalogSnafu)?
148 .with_context(|| TableNotFoundSnafu {
149 table_name: format_full_table_name(catalog_name, schema_name, table_name),
150 })?;
151
152 let dataframe = self
153 .query_engine
154 .read_table(table)
155 .with_context(|_| ReadTableSnafu {
156 table_name: format_full_table_name(catalog_name, schema_name, table_name),
157 })?;
158
159 let logical_plan =
160 prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
161
162 debug!(
163 "Prometheus remote read, table: {}, logical plan: {}",
164 table_name,
165 logical_plan.display_indent(),
166 );
167
168 self.query_engine
169 .execute(logical_plan, ctx.clone())
170 .await
171 .context(ExecLogicalPlanSnafu)
172 }
173
174 #[tracing::instrument(skip_all)]
175 async fn handle_remote_queries(
176 &self,
177 ctx: QueryContextRef,
178 queries: &[Query],
179 ) -> ServerResult<Vec<(String, Output)>> {
180 let mut results = Vec::with_capacity(queries.len());
181
182 let catalog_name = ctx.current_catalog();
183 let schema_name = ctx.current_schema();
184
185 for query in queries {
186 let table_name = prom_store::table_name(query)?;
187
188 let output = self
189 .handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
190 .await
191 .map_err(BoxedError::new)
192 .context(error::ExecuteQuerySnafu)?;
193
194 results.push((table_name, output));
195 }
196 Ok(results)
197 }
198}
199
200#[async_trait]
201impl PendingRowsSchemaAlterer for Instance {
202 async fn create_tables_if_missing_batch(
203 &self,
204 catalog: &str,
205 schema: &str,
206 tables: &[(&str, &[api::v1::ColumnSchema])],
207 with_metric_engine: bool,
208 ctx: QueryContextRef,
209 ) -> ServerResult<()> {
210 if tables.is_empty() {
211 return Ok(());
212 }
213
214 let create_type = auto_create_table_type_for_prom_remote_write(&ctx, with_metric_engine);
215 if let Some(physical_table) = required_physical_table_for_create_type(&create_type) {
216 self.create_metric_physical_table_if_missing(
217 catalog,
218 schema,
219 physical_table,
220 ctx.clone(),
221 )
222 .await?;
223 }
224
225 let engine = if matches!(create_type, AutoCreateTableType::Logical(_)) {
226 METRIC_ENGINE_NAME
227 } else {
228 common_catalog::consts::default_engine()
229 };
230
231 let mut create_exprs: Vec<CreateTableExpr> = Vec::with_capacity(tables.len());
234 for &(table_name, request_schema) in tables {
235 let existing = self
236 .catalog_manager()
237 .table(catalog, schema, table_name, Some(ctx.as_ref()))
238 .await
239 .map_err(BoxedError::new)
240 .context(error::ExecuteGrpcQuerySnafu)?;
241 if existing.is_some() {
242 continue;
243 }
244
245 let table_ref = TableReference::full(catalog, schema, table_name);
246 let mut create_table_expr = build_create_table_expr(&table_ref, request_schema, engine)
247 .map_err(BoxedError::new)
248 .context(error::ExecuteGrpcQuerySnafu)?;
249
250 let mut table_options = std::collections::HashMap::with_capacity(4);
251 fill_table_options_for_create(&mut table_options, &create_type, &ctx);
252 create_table_expr.table_options.extend(table_options);
253 create_exprs.push(create_table_expr);
254 }
255
256 if create_exprs.is_empty() {
257 return Ok(());
258 }
259
260 match create_type {
261 AutoCreateTableType::Logical(_) => {
262 self.statement_executor
264 .create_logical_tables(&create_exprs, ctx)
265 .await
266 .map_err(BoxedError::new)
267 .context(error::ExecuteGrpcQuerySnafu)?;
268 }
269 AutoCreateTableType::Physical => {
270 for mut expr in create_exprs {
272 expr.table_options
273 .insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
274 self.statement_executor
275 .create_table_inner(&mut expr, None, ctx.clone())
276 .await
277 .map_err(BoxedError::new)
278 .context(error::ExecuteGrpcQuerySnafu)?;
279 }
280 }
281 create_type => {
282 return error::InvalidPromRemoteRequestSnafu {
283 msg: format!(
284 "prom remote write only supports logical or physical auto-create: {}",
285 create_type.as_str()
286 ),
287 }
288 .fail();
289 }
290 }
291
292 Ok(())
293 }
294
295 async fn add_missing_prom_tag_columns_batch(
296 &self,
297 catalog: &str,
298 schema: &str,
299 tables: &[(&str, &[String])],
300 ctx: QueryContextRef,
301 ) -> ServerResult<()> {
302 if tables.is_empty() {
303 return Ok(());
304 }
305
306 let alter_exprs: Vec<AlterTableExpr> = tables
307 .iter()
308 .filter(|(_, columns)| !columns.is_empty())
309 .map(|&(table_name, columns)| {
310 let add_columns = AddColumns {
311 add_columns: columns
312 .iter()
313 .map(|column_name| AddColumn {
314 column_def: Some(ColumnDef {
315 name: column_name.clone(),
316 data_type: ColumnDataType::String as i32,
317 is_nullable: true,
318 semantic_type: SemanticType::Tag as i32,
319 comment: String::new(),
320 ..Default::default()
321 }),
322 location: None,
323 add_if_not_exists: true,
324 })
325 .collect(),
326 };
327
328 AlterTableExpr {
329 catalog_name: catalog.to_string(),
330 schema_name: schema.to_string(),
331 table_name: table_name.to_string(),
332 kind: Some(Kind::AddColumns(add_columns)),
333 }
334 })
335 .collect();
336
337 if alter_exprs.is_empty() {
338 return Ok(());
339 }
340
341 self.statement_executor
342 .alter_logical_tables(alter_exprs, ctx)
343 .await
344 .map_err(BoxedError::new)
345 .context(error::ExecuteGrpcQuerySnafu)?;
346
347 Ok(())
348 }
349}
350
351#[async_trait]
352impl PromStoreProtocolHandler for Instance {
353 async fn pre_write(
354 &self,
355 request: &RowInsertRequests,
356 ctx: QueryContextRef,
357 ) -> ServerResult<()> {
358 self.plugins
359 .get::<PermissionCheckerRef>()
360 .as_ref()
361 .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
362 .context(AuthSnafu)?;
363 let interceptor_ref = self
364 .plugins
365 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
366 interceptor_ref.pre_write(request, ctx)?;
367 Ok(())
368 }
369
370 async fn write(
371 &self,
372 request: RowInsertRequests,
373 ctx: QueryContextRef,
374 with_metric_engine: bool,
375 ) -> ServerResult<Output> {
376 self.pre_write(&request, ctx.clone()).await?;
377
378 let output = if with_metric_engine {
379 let physical_table = ctx
380 .extension(PHYSICAL_TABLE_PARAM)
381 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
382 .to_string();
383 self.handle_metric_row_inserts(request, ctx.clone(), physical_table.clone())
384 .await
385 .map_err(BoxedError::new)
386 .context(error::ExecuteGrpcQuerySnafu)?
387 } else {
388 self.handle_row_inserts(request, ctx.clone(), true, true)
389 .await
390 .map_err(BoxedError::new)
391 .context(error::ExecuteGrpcQuerySnafu)?
392 };
393
394 Ok(output)
395 }
396
397 #[instrument(skip_all, fields(table_name))]
398 async fn read(
399 &self,
400 request: ReadRequest,
401 ctx: QueryContextRef,
402 ) -> ServerResult<PromStoreResponse> {
403 self.plugins
404 .get::<PermissionCheckerRef>()
405 .as_ref()
406 .check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
407 .context(AuthSnafu)?;
408 let interceptor_ref = self
409 .plugins
410 .get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
411 interceptor_ref.pre_read(&request, ctx.clone())?;
412
413 let response_type = negotiate_response_type(&request.accepted_response_types)?;
414
415 let results = self.handle_remote_queries(ctx, &request.queries).await?;
417
418 match response_type {
419 ResponseType::Samples => {
420 let mut query_results = Vec::with_capacity(results.len());
421 let mut map = HashMap::new();
422 for (table_name, output) in results {
423 let plan = output.meta.plan.clone();
424 query_results.push(to_query_result(&table_name, output).await?);
425 if let Some(ref plan) = plan {
426 collect_plan_metrics(plan, &mut [&mut map]);
427 }
428 }
429
430 let response = ReadResponse {
431 results: query_results,
432 };
433
434 let resp_metrics = map
435 .into_iter()
436 .map(|(k, v)| (k, v.into()))
437 .collect::<HashMap<_, _>>();
438
439 Ok(PromStoreResponse {
441 content_type: CONTENT_TYPE_PROTOBUF.clone(),
442 content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
443 resp_metrics,
444 body: prom_store::snappy_compress(&response.encode_to_vec())?,
445 })
446 }
447 ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
448 feat: "streamed remote read",
449 }
450 .fail(),
451 }
452 }
453
454 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
455 todo!();
456 }
457}
458
459impl Instance {
460 async fn create_metric_physical_table_if_missing(
461 &self,
462 catalog: &str,
463 schema: &str,
464 physical_table: &str,
465 ctx: QueryContextRef,
466 ) -> ServerResult<()> {
467 let table = self
468 .catalog_manager()
469 .table(catalog, schema, physical_table, Some(ctx.as_ref()))
470 .await
471 .map_err(BoxedError::new)
472 .context(error::ExecuteGrpcQuerySnafu)?;
473 if table.is_some() {
474 return Ok(());
475 }
476
477 let table_ref = TableReference::full(catalog, schema, physical_table);
478 let default_schema = vec![
479 api::v1::ColumnSchema {
480 column_name: common_query::prelude::greptime_timestamp().to_string(),
481 datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
482 semantic_type: api::v1::SemanticType::Timestamp as i32,
483 datatype_extension: None,
484 options: None,
485 },
486 api::v1::ColumnSchema {
487 column_name: common_query::prelude::greptime_value().to_string(),
488 datatype: api::v1::ColumnDataType::Float64 as i32,
489 semantic_type: api::v1::SemanticType::Field as i32,
490 datatype_extension: None,
491 options: None,
492 },
493 ];
494 let mut create_table_expr = build_create_table_expr(
495 &table_ref,
496 &default_schema,
497 common_catalog::consts::default_engine(),
498 )
499 .map_err(BoxedError::new)
500 .context(error::ExecuteGrpcQuerySnafu)?;
501 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
502 fill_metric_physical_table_options(&mut create_table_expr.table_options);
503
504 self.statement_executor
505 .create_table_inner(&mut create_table_expr, None, ctx)
506 .await
507 .map_err(BoxedError::new)
508 .context(error::ExecuteGrpcQuerySnafu)?;
509
510 Ok(())
511 }
512}
513
514pub struct ExportMetricHandler {
519 inserter: InserterRef,
520 statement_executor: Arc<StatementExecutor>,
521}
522
523impl ExportMetricHandler {
524 pub fn new_handler(
525 inserter: InserterRef,
526 statement_executor: Arc<StatementExecutor>,
527 ) -> PromStoreProtocolHandlerRef {
528 Arc::new(Self {
529 inserter,
530 statement_executor,
531 })
532 }
533}
534
535#[async_trait]
536impl PromStoreProtocolHandler for ExportMetricHandler {
537 async fn write(
538 &self,
539 request: RowInsertRequests,
540 ctx: QueryContextRef,
541 _: bool,
542 ) -> ServerResult<Output> {
543 self.inserter
544 .handle_metric_row_inserts(
545 request,
546 ctx,
547 &self.statement_executor,
548 GREPTIME_PHYSICAL_TABLE.to_string(),
549 )
550 .await
551 .map_err(BoxedError::new)
552 .context(error::ExecuteGrpcQuerySnafu)
553 }
554
555 async fn read(
556 &self,
557 _request: ReadRequest,
558 _ctx: QueryContextRef,
559 ) -> ServerResult<PromStoreResponse> {
560 unreachable!();
561 }
562
563 async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
564 unreachable!();
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use std::sync::Arc;
571
572 use session::context::QueryContext;
573
574 use super::*;
575
576 #[test]
577 fn test_auto_create_table_type_for_prom_remote_write_metric_engine() {
578 let mut query_ctx = QueryContext::with(
579 common_catalog::consts::DEFAULT_CATALOG_NAME,
580 common_catalog::consts::DEFAULT_SCHEMA_NAME,
581 );
582 query_ctx.set_extension(PHYSICAL_TABLE_PARAM, "metric_physical".to_string());
583 let ctx = Arc::new(query_ctx);
584
585 let create_type = auto_create_table_type_for_prom_remote_write(&ctx, true);
586 match create_type {
587 AutoCreateTableType::Logical(physical) => assert_eq!(physical, "metric_physical"),
588 _ => panic!("expected logical table create type"),
589 }
590 }
591
592 #[test]
593 fn test_auto_create_table_type_for_prom_remote_write_without_metric_engine() {
594 let ctx = Arc::new(QueryContext::with(
595 common_catalog::consts::DEFAULT_CATALOG_NAME,
596 common_catalog::consts::DEFAULT_SCHEMA_NAME,
597 ));
598
599 let create_type = auto_create_table_type_for_prom_remote_write(&ctx, false);
600 match create_type {
601 AutoCreateTableType::Physical => {}
602 _ => panic!("expected physical table create type"),
603 }
604 }
605
606 #[test]
607 fn test_required_physical_table_for_create_type() {
608 let logical = AutoCreateTableType::Logical("phy_table".to_string());
609 assert_eq!(
610 Some("phy_table"),
611 required_physical_table_for_create_type(&logical)
612 );
613
614 let physical = AutoCreateTableType::Physical;
615 assert_eq!(None, required_physical_table_for_create_type(&physical));
616 }
617
618 #[test]
619 fn test_metric_physical_table_options_forces_flat_sst_format() {
620 let mut table_options = HashMap::new();
621
622 fill_metric_physical_table_options(&mut table_options);
623
624 assert_eq!(
625 Some("flat"),
626 table_options.get(SST_FORMAT_KEY).map(String::as_str)
627 );
628 assert_eq!(
629 Some("true"),
630 table_options
631 .get(PHYSICAL_TABLE_METADATA_KEY)
632 .map(String::as_str)
633 );
634 }
635}