1use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::Extension;
19use axum::body::Bytes;
20use axum::extract::{Query, State};
21use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use mime_guess::mime;
28use pipeline::util::to_pipeline_version;
29use pipeline::{ContextReq, PipelineDefinition};
30use prometheus::HistogramTimer;
31use prost::Message;
32use serde::{Deserialize, Serialize};
33use session::context::{Channel, QueryContext};
34use snafu::prelude::*;
35use table::requests::{
36 METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE,
37 SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS,
38};
39
40use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
41use crate::http::extractor::PipelineInfo;
42use crate::http::header::{
43 CONTENT_TYPE_PROTOBUF_STR, GREPTIME_DB_HEADER_METRICS, write_cost_header_map,
44};
45use crate::pending_rows_batcher::PendingRowsBatcher;
46use crate::prom_remote_write::decode::PromSeriesProcessor;
47use crate::prom_remote_write::decode_remote_write_request;
48use crate::prom_remote_write::v2::{RemoteWriteV2RequestExt, decode_remote_write_v2_request};
49use crate::prom_remote_write::validation::PromValidationMode;
50use crate::prom_store::{extract_schema_from_read_request, snappy_decompress};
51use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
52
53pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
54pub const DEFAULT_ENCODING: &str = "snappy";
55pub const VM_ENCODING: &str = "zstd";
56pub const VM_PROTO_VERSION: &str = "1";
57const REMOTE_WRITE_V1_VERSION: &str = "1.0";
58const REMOTE_WRITE_V2_VERSION: &str = "2.0";
59const REMOTE_WRITE_V1_PROTO: &str = "prometheus.WriteRequest";
60const REMOTE_WRITE_V2_PROTO: &str = "io.prometheus.write.v2.Request";
61const CONTENT_TYPE_PROTO_PARAM: &str = "proto";
62const REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER: &str = "x-prometheus-remote-write-samples-written";
63const REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER: &str =
64 "x-prometheus-remote-write-histograms-written";
65const REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER: &str =
66 "x-prometheus-remote-write-exemplars-written";
67
68#[derive(Clone)]
69pub struct PromStoreState {
70 pub prom_store_handler: PromStoreProtocolHandlerRef,
71 pub pipeline_handler: Option<PipelineHandlerRef>,
72 pub prom_store_with_metric_engine: bool,
73 pub prom_validation_mode: PromValidationMode,
74 pub pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct RemoteWriteQuery {
79 pub db: Option<String>,
80 pub physical_table: Option<String>,
83 pub get_vm_proto_version: Option<String>,
85}
86
87impl Default for RemoteWriteQuery {
88 fn default() -> RemoteWriteQuery {
89 Self {
90 db: Some(DEFAULT_SCHEMA_NAME.to_string()),
91 physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
92 get_vm_proto_version: None,
93 }
94 }
95}
96
97#[axum_macros::debug_handler]
98#[tracing::instrument(
99 skip_all,
100 fields(protocol = "prometheus", request_type = "remote_write")
101)]
102pub async fn remote_write(
103 State(state): State<PromStoreState>,
104 Query(params): Query<RemoteWriteQuery>,
105 Extension(query_ctx): Extension<QueryContext>,
106 content_type: Option<TypedHeader<headers::ContentType>>,
107 pipeline_info: PipelineInfo,
108 content_encoding: TypedHeader<headers::ContentEncoding>,
109 body: Bytes,
110) -> Result<axum::response::Response> {
111 let is_zstd = content_encoding.contains(VM_ENCODING);
112
113 match remote_write_proto(content_type) {
114 RemoteWriteProto::V1 => {
115 remote_write_v1(state, params, query_ctx, pipeline_info, is_zstd, body).await
116 }
117 RemoteWriteProto::V2 => {
118 if let Some(response) = unsupported_remote_write_v2_encoding_response(&content_encoding)
119 {
120 return Ok(response);
121 }
122 remote_write_v2(state, params, query_ctx, pipeline_info, is_zstd, body).await
123 }
124 RemoteWriteProto::Unsupported(content_type) => Ok((
125 StatusCode::UNSUPPORTED_MEDIA_TYPE,
126 format!("unsupported prometheus remote write content type: {content_type}"),
127 )
128 .into_response()),
129 }
130}
131
132async fn remote_write_v1(
133 state: PromStoreState,
134 params: RemoteWriteQuery,
135 query_ctx: QueryContext,
136 pipeline_info: PipelineInfo,
137 is_zstd: bool,
138 body: Bytes,
139) -> Result<axum::response::Response> {
140 let PromStoreState {
141 prom_store_handler,
142 pipeline_handler,
143 prom_store_with_metric_engine,
144 prom_validation_mode,
145 pending_rows_batcher,
146 } = state;
147
148 if let Some(response) = vm_proto_version_response(¶ms) {
149 return Ok(response);
150 }
151
152 let (db, query_ctx, _timer) =
153 prepare_remote_write_context(¶ms, query_ctx, REMOTE_WRITE_V1_VERSION);
154
155 let mut processor = PromSeriesProcessor::default_processor();
156
157 if let Some(pipeline_name) = pipeline_info.pipeline_name {
158 let pipeline_def = PipelineDefinition::from_name(
159 &pipeline_name,
160 to_pipeline_version(pipeline_info.pipeline_version.as_deref())
161 .context(PipelineSnafu)?,
162 None,
163 )
164 .context(PipelineSnafu)?;
165 let pipeline_handler = pipeline_handler.context(InternalSnafu {
166 err_msg: "pipeline handler is not set".to_string(),
167 })?;
168
169 processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
170 }
171
172 let mut req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor)?;
173
174 let req = if processor.use_pipeline {
175 processor.exec_pipeline().await?
176 } else {
177 req.as_insert_requests()
178 };
179
180 let outcome = write_prometheus_rows(
181 prom_store_handler,
182 pending_rows_batcher,
183 prom_store_with_metric_engine,
184 &db,
185 query_ctx,
186 req,
187 )
188 .await?;
189
190 Ok((
191 StatusCode::NO_CONTENT,
192 write_cost_header_map(outcome.write_cost),
193 )
194 .into_response())
195}
196
197async fn remote_write_v2(
198 state: PromStoreState,
199 params: RemoteWriteQuery,
200 query_ctx: QueryContext,
201 pipeline_info: PipelineInfo,
202 is_zstd: bool,
203 body: Bytes,
204) -> Result<axum::response::Response> {
205 let PromStoreState {
206 prom_store_handler,
207 pipeline_handler: _,
208 prom_store_with_metric_engine,
209 prom_validation_mode: _,
210 pending_rows_batcher,
211 } = state;
212
213 if let Some(response) = vm_proto_version_response(¶ms) {
214 return Ok(response);
215 }
216
217 let _ = pipeline_info;
220
221 let (db, query_ctx, _timer) =
222 prepare_remote_write_context(¶ms, query_ctx, REMOTE_WRITE_V2_VERSION);
223
224 let request = match decode_remote_write_v2_request(is_zstd, body) {
225 Ok(request) => request,
226 Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
227 };
228 let req = match request.into_context_req() {
229 Ok(req) => req,
230 Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
231 };
232
233 let outcome = match write_prometheus_rows_with_progress(
234 prom_store_handler,
235 pending_rows_batcher,
236 prom_store_with_metric_engine,
237 &db,
238 query_ctx,
239 req,
240 )
241 .await
242 {
243 Ok(outcome) => outcome,
244 Err(error) => {
245 return Ok(remote_write_v2_error_response(
246 error.error,
247 error.rows_written,
248 0,
249 0,
250 ));
251 }
252 };
253
254 let mut headers = write_cost_header_map(outcome.write_cost);
255 append_remote_write_v2_written_headers(&mut headers, outcome.rows_written, 0, 0);
256
257 Ok((StatusCode::NO_CONTENT, headers).into_response())
258}
259
260fn vm_proto_version_response(params: &RemoteWriteQuery) -> Option<axum::response::Response> {
261 params
262 .get_vm_proto_version
263 .as_ref()
264 .map(|_| VM_PROTO_VERSION.into_response())
265}
266
267fn prepare_remote_write_context(
268 params: &RemoteWriteQuery,
269 mut query_ctx: QueryContext,
270 remote_write_version: &str,
271) -> (String, Arc<QueryContext>, HistogramTimer) {
272 let db = params.db.clone().unwrap_or_default();
273 query_ctx.set_channel(Channel::Prometheus);
274 let physical_table = params
275 .physical_table
276 .clone()
277 .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
278 query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
279 query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
284 query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS);
285 query_ctx.set_extension(SEMANTIC_SOURCE_VERSION, remote_write_version);
286 query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED);
287 let query_ctx = Arc::new(query_ctx);
288 let timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
289 .with_label_values(&[db.as_str()])
290 .start_timer();
291
292 (db, query_ctx, timer)
293}
294
295struct PromWriteOutcome {
296 write_cost: usize,
297 rows_written: u64,
298}
299
300struct PromWriteError {
301 error: error::Error,
302 rows_written: u64,
303}
304
305async fn write_prometheus_rows(
306 prom_store_handler: PromStoreProtocolHandlerRef,
307 pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
308 prom_store_with_metric_engine: bool,
309 db: &str,
310 query_ctx: Arc<QueryContext>,
311 req: ContextReq,
312) -> Result<PromWriteOutcome> {
313 write_prometheus_rows_with_progress(
314 prom_store_handler,
315 pending_rows_batcher,
316 prom_store_with_metric_engine,
317 db,
318 query_ctx,
319 req,
320 )
321 .await
322 .map_err(|error| error.error)
323}
324
325async fn write_prometheus_rows_with_progress(
326 prom_store_handler: PromStoreProtocolHandlerRef,
327 pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
328 prom_store_with_metric_engine: bool,
329 db: &str,
330 query_ctx: Arc<QueryContext>,
331 req: ContextReq,
332) -> std::result::Result<PromWriteOutcome, PromWriteError> {
333 if prom_store_with_metric_engine && let Some(batcher) = pending_rows_batcher {
334 let mut rows_written = 0;
335 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
336 prom_store_handler
337 .pre_write(&reqs, temp_ctx.clone())
338 .await
339 .map_err(|error| PromWriteError {
340 error,
341 rows_written,
342 })?;
343 let rows = batcher
344 .submit(reqs, temp_ctx)
345 .await
346 .map_err(|error| PromWriteError {
347 error,
348 rows_written,
349 })?;
350 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
351 .with_label_values(&[db])
352 .inc_by(rows);
353 rows_written += rows;
354 }
355 return Ok(PromWriteOutcome {
356 write_cost: 0,
357 rows_written,
358 });
359 }
360
361 let mut write_cost = 0;
362 let mut rows_written = 0;
363 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
364 let cnt: u64 = reqs
365 .inserts
366 .iter()
367 .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
368 .sum();
369 let output = prom_store_handler
370 .write(reqs, temp_ctx, prom_store_with_metric_engine)
371 .await
372 .map_err(|error| PromWriteError {
373 error,
374 rows_written,
375 })?;
376 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
377 .with_label_values(&[db])
378 .inc_by(cnt);
379 write_cost += output.meta.cost;
380 rows_written += cnt;
381 }
382
383 Ok(PromWriteOutcome {
384 write_cost,
385 rows_written,
386 })
387}
388
389fn remote_write_v2_error_response(
390 error: error::Error,
391 samples: u64,
392 histograms: u64,
393 exemplars: u64,
394) -> axum::response::Response {
395 let mut response = error.into_response();
396 append_remote_write_v2_written_headers(response.headers_mut(), samples, histograms, exemplars);
397 response
398}
399
400fn append_remote_write_v2_written_headers(
401 headers: &mut HeaderMap,
402 samples: u64,
403 histograms: u64,
404 exemplars: u64,
405) {
406 headers.insert(
407 REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER,
408 HeaderValue::from_str(&samples.to_string()).expect("u64 header value is valid"),
409 );
410 headers.insert(
411 REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER,
412 HeaderValue::from_str(&histograms.to_string()).expect("u64 header value is valid"),
413 );
414 headers.insert(
415 REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER,
416 HeaderValue::from_str(&exemplars.to_string()).expect("u64 header value is valid"),
417 );
418}
419
420enum RemoteWriteProto {
421 V1,
422 V2,
423 Unsupported(mime::Mime),
424}
425
426fn remote_write_proto(content_type: Option<TypedHeader<headers::ContentType>>) -> RemoteWriteProto {
428 let Some(TypedHeader(content_type)) = content_type else {
429 return RemoteWriteProto::V1;
430 };
431
432 let mime_type: mime::Mime = content_type.into();
433 if !mime_type
434 .essence_str()
435 .eq_ignore_ascii_case(CONTENT_TYPE_PROTOBUF_STR)
436 {
437 return RemoteWriteProto::Unsupported(mime_type);
438 }
439
440 for (name, value) in mime_type.params() {
441 if !name.as_str().eq_ignore_ascii_case(CONTENT_TYPE_PROTO_PARAM) {
442 continue;
443 }
444
445 return match value.as_str() {
446 REMOTE_WRITE_V1_PROTO => RemoteWriteProto::V1,
447 REMOTE_WRITE_V2_PROTO => RemoteWriteProto::V2,
448 _ => RemoteWriteProto::Unsupported(mime_type.clone()),
449 };
450 }
451
452 RemoteWriteProto::V1
453}
454
455fn unsupported_remote_write_v2_encoding_response(
456 content_encoding: &headers::ContentEncoding,
457) -> Option<axum::response::Response> {
458 if content_encoding.contains(DEFAULT_ENCODING) || content_encoding.contains(VM_ENCODING) {
459 return None;
460 }
461
462 Some((
463 StatusCode::UNSUPPORTED_MEDIA_TYPE,
464 format!(
465 "unsupported prometheus remote write content encoding: only {DEFAULT_ENCODING} and {VM_ENCODING} are supported"
466 ),
467 )
468 .into_response())
469}
470
471impl IntoResponse for PromStoreResponse {
472 fn into_response(self) -> axum::response::Response {
473 let mut header_map = HeaderMap::new();
474 header_map.insert(&header::CONTENT_TYPE, self.content_type);
475 header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
476
477 let metrics = if self.resp_metrics.is_empty() {
478 None
479 } else {
480 serde_json::to_string(&self.resp_metrics).ok()
481 };
482 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
483 header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
484 }
485
486 (header_map, self.body).into_response()
487 }
488}
489
490#[axum_macros::debug_handler]
491#[tracing::instrument(
492 skip_all,
493 fields(protocol = "prometheus", request_type = "remote_read")
494)]
495pub async fn remote_read(
496 State(state): State<PromStoreState>,
497 Query(params): Query<RemoteWriteQuery>,
498 Extension(mut query_ctx): Extension<QueryContext>,
499 body: Bytes,
500) -> Result<PromStoreResponse> {
501 let db = params.db.clone().unwrap_or_default();
502 query_ctx.set_channel(Channel::Prometheus);
503
504 let request = decode_remote_read_request(body).await?;
505
506 if let Some(schema) = extract_schema_from_read_request(&request) {
508 query_ctx.set_current_schema(&schema);
509 }
510
511 let query_ctx = Arc::new(query_ctx);
512 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
513 .with_label_values(&[db.as_str()])
514 .start_timer();
515
516 state.prom_store_handler.read(request, query_ctx).await
517}
518
519async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
520 let buf = snappy_decompress(&body[..])?;
521
522 ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
523}
524
525#[cfg(test)]
526mod tests {
527 use api::prom_store::remote::ReadRequest;
528 use api::v1::RowInsertRequests;
529 use async_trait::async_trait;
530 use common_query::Output;
531 use pipeline::GreptimePipelineParams;
532 use session::context::{QueryContext, QueryContextRef};
533
534 use super::*;
535 use crate::prom_remote_write::validation::PromValidationMode;
536 use crate::prom_store::Metrics;
537 use crate::query_handler::PromStoreProtocolHandler;
538
539 #[test]
540 fn test_remote_write_proto() {
541 assert!(matches!(
542 remote_write_proto(content_type(
543 "application/x-protobuf;proto=io.prometheus.write.v2.Request"
544 )),
545 RemoteWriteProto::V2
546 ));
547 assert!(matches!(
548 remote_write_proto(content_type(
549 "application/x-protobuf; proto=\"io.prometheus.write.v2.Request\""
550 )),
551 RemoteWriteProto::V2
552 ));
553 assert!(matches!(
554 remote_write_proto(content_type(
555 "APPLICATION/X-PROTOBUF;proto=io.prometheus.write.v2.Request"
556 )),
557 RemoteWriteProto::V2
558 ));
559 assert!(matches!(
560 remote_write_proto(content_type("application/x-protobuf")),
561 RemoteWriteProto::V1
562 ));
563 assert!(matches!(
564 remote_write_proto(content_type(
565 "application/x-protobuf;proto=prometheus.WriteRequest"
566 )),
567 RemoteWriteProto::V1
568 ));
569 assert!(matches!(
570 remote_write_proto(content_type(
571 "application/x-protobuf;proto=unknown.WriteRequest"
572 )),
573 RemoteWriteProto::Unsupported(_)
574 ));
575 assert!(matches!(
576 remote_write_proto(content_type(
577 "application/json;proto=io.prometheus.write.v2.Request"
578 )),
579 RemoteWriteProto::Unsupported(_)
580 ));
581 assert!(matches!(remote_write_proto(None), RemoteWriteProto::V1));
582 }
583
584 fn content_type(value: &str) -> Option<TypedHeader<headers::ContentType>> {
585 Some(TypedHeader(std::str::FromStr::from_str(value).unwrap()))
586 }
587
588 #[test]
589 fn test_prepare_remote_write_context_stamps_semantics() {
590 let (_, query_ctx, _timer) = prepare_remote_write_context(
591 &RemoteWriteQuery::default(),
592 QueryContext::with("greptime", "public"),
593 REMOTE_WRITE_V2_VERSION,
594 );
595
596 assert_eq!(
597 query_ctx.extension(SEMANTIC_SIGNAL_TYPE),
598 Some(SIGNAL_TYPE_METRIC)
599 );
600 assert_eq!(
601 query_ctx.extension(SEMANTIC_SOURCE),
602 Some(SOURCE_PROMETHEUS)
603 );
604 assert_eq!(
605 query_ctx.extension(SEMANTIC_SOURCE_VERSION),
606 Some(REMOTE_WRITE_V2_VERSION)
607 );
608 assert_eq!(
609 query_ctx.extension(SEMANTIC_METRIC_METADATA_QUALITY),
610 Some(METADATA_QUALITY_INFERRED)
611 );
612 }
613
614 #[tokio::test]
615 async fn test_remote_write_v2_ignores_pipeline() {
616 let request = api::greptime_proto::io::prometheus::write::v2::Request {
617 symbols: vec![String::new()],
618 timeseries: Vec::new(),
619 };
620 let body =
621 Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
622
623 let response = remote_write_v2(
624 test_state(),
625 RemoteWriteQuery::default(),
626 QueryContext::with("greptime", "public"),
627 pipeline_info(Some("pipeline")),
628 false,
629 body,
630 )
631 .await
632 .unwrap();
633
634 assert_eq!(response.status(), StatusCode::NO_CONTENT);
635 assert_eq!(
636 Some("0"),
637 response
638 .headers()
639 .get(REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER)
640 .map(|x| x.to_str().unwrap())
641 );
642 }
643
644 fn test_state() -> PromStoreState {
645 PromStoreState {
646 prom_store_handler: Arc::new(NoopPromStoreHandler),
647 pipeline_handler: None,
648 prom_store_with_metric_engine: false,
649 prom_validation_mode: PromValidationMode::Strict,
650 pending_rows_batcher: None,
651 }
652 }
653
654 fn pipeline_info(pipeline_name: Option<&str>) -> PipelineInfo {
655 PipelineInfo {
656 pipeline_name: pipeline_name.map(ToString::to_string),
657 pipeline_version: None,
658 pipeline_params: GreptimePipelineParams::default(),
659 }
660 }
661
662 struct NoopPromStoreHandler;
663
664 #[async_trait]
665 impl PromStoreProtocolHandler for NoopPromStoreHandler {
666 async fn write(
667 &self,
668 _request: RowInsertRequests,
669 _ctx: QueryContextRef,
670 _with_metric_engine: bool,
671 ) -> Result<Output> {
672 unreachable!("empty remote write v2 request should not write")
673 }
674
675 async fn read(
676 &self,
677 _request: ReadRequest,
678 _ctx: QueryContextRef,
679 ) -> Result<PromStoreResponse> {
680 unimplemented!()
681 }
682
683 async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {
684 unimplemented!()
685 }
686 }
687}