servers/http/
prom_store.rs1use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::body::Bytes;
19use axum::extract::{Query, State};
20use axum::http::{header, HeaderValue, StatusCode};
21use axum::response::IntoResponse;
22use axum::Extension;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use hyper::HeaderMap;
28use lazy_static::lazy_static;
29use object_pool::Pool;
30use pipeline::util::to_pipeline_version;
31use pipeline::PipelineDefinition;
32use prost::Message;
33use serde::{Deserialize, Serialize};
34use session::context::{Channel, QueryContext};
35use snafu::prelude::*;
36
37use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
38use crate::http::extractor::PipelineInfo;
39use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
40use crate::http::PromValidationMode;
41use crate::prom_row_builder::TablesBuilder;
42use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress};
43use crate::proto::{PromSeriesProcessor, PromWriteRequest};
44use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
45
46pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
47lazy_static! {
48 static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
49 Pool::new(256, PromWriteRequest::default);
50}
51
52pub const DEFAULT_ENCODING: &str = "snappy";
53pub const VM_ENCODING: &str = "zstd";
54pub const VM_PROTO_VERSION: &str = "1";
55
56#[derive(Clone)]
57pub struct PromStoreState {
58 pub prom_store_handler: PromStoreProtocolHandlerRef,
59 pub pipeline_handler: Option<PipelineHandlerRef>,
60 pub prom_store_with_metric_engine: bool,
61 pub prom_validation_mode: PromValidationMode,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
65pub struct RemoteWriteQuery {
66 pub db: Option<String>,
67 pub physical_table: Option<String>,
70 pub get_vm_proto_version: Option<String>,
72}
73
74impl Default for RemoteWriteQuery {
75 fn default() -> RemoteWriteQuery {
76 Self {
77 db: Some(DEFAULT_SCHEMA_NAME.to_string()),
78 physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
79 get_vm_proto_version: None,
80 }
81 }
82}
83
84#[axum_macros::debug_handler]
85#[tracing::instrument(
86 skip_all,
87 fields(protocol = "prometheus", request_type = "remote_write")
88)]
89pub async fn remote_write(
90 State(state): State<PromStoreState>,
91 Query(params): Query<RemoteWriteQuery>,
92 Extension(mut query_ctx): Extension<QueryContext>,
93 pipeline_info: PipelineInfo,
94 content_encoding: TypedHeader<headers::ContentEncoding>,
95 body: Bytes,
96) -> Result<impl IntoResponse> {
97 let PromStoreState {
98 prom_store_handler,
99 pipeline_handler,
100 prom_store_with_metric_engine,
101 prom_validation_mode,
102 } = state;
103
104 if let Some(_vm_handshake) = params.get_vm_proto_version {
105 return Ok(VM_PROTO_VERSION.into_response());
106 }
107
108 let db = params.db.clone().unwrap_or_default();
109 query_ctx.set_channel(Channel::Prometheus);
110 if let Some(physical_table) = params.physical_table {
111 query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
112 }
113 let query_ctx = Arc::new(query_ctx);
114 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
115 .with_label_values(&[db.as_str()])
116 .start_timer();
117
118 let is_zstd = content_encoding.contains(VM_ENCODING);
119
120 let mut processor = PromSeriesProcessor::default_processor();
121
122 if let Some(pipeline_name) = pipeline_info.pipeline_name {
123 let pipeline_def = PipelineDefinition::from_name(
124 &pipeline_name,
125 to_pipeline_version(pipeline_info.pipeline_version.as_deref())
126 .context(PipelineSnafu)?,
127 None,
128 )
129 .context(PipelineSnafu)?;
130 let pipeline_handler = pipeline_handler.context(InternalSnafu {
131 err_msg: "pipeline handler is not set".to_string(),
132 })?;
133
134 processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
135 }
136
137 let mut req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor)?;
138
139 let req = if processor.use_pipeline {
140 processor.exec_pipeline().await?
141 } else {
142 req.as_insert_requests()
143 };
144
145 let mut cost = 0;
146 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
147 let cnt: u64 = reqs
148 .inserts
149 .iter()
150 .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
151 .sum();
152 let output = prom_store_handler
153 .write(reqs, temp_ctx, prom_store_with_metric_engine)
154 .await?;
155 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
156 .with_label_values(&[db.as_str()])
157 .inc_by(cnt);
158 cost += output.meta.cost;
159 }
160
161 Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
162}
163
164impl IntoResponse for PromStoreResponse {
165 fn into_response(self) -> axum::response::Response {
166 let mut header_map = HeaderMap::new();
167 header_map.insert(&header::CONTENT_TYPE, self.content_type);
168 header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
169
170 let metrics = if self.resp_metrics.is_empty() {
171 None
172 } else {
173 serde_json::to_string(&self.resp_metrics).ok()
174 };
175 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
176 header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
177 }
178
179 (header_map, self.body).into_response()
180 }
181}
182
183#[axum_macros::debug_handler]
184#[tracing::instrument(
185 skip_all,
186 fields(protocol = "prometheus", request_type = "remote_read")
187)]
188pub async fn remote_read(
189 State(state): State<PromStoreState>,
190 Query(params): Query<RemoteWriteQuery>,
191 Extension(mut query_ctx): Extension<QueryContext>,
192 body: Bytes,
193) -> Result<PromStoreResponse> {
194 let db = params.db.clone().unwrap_or_default();
195 query_ctx.set_channel(Channel::Prometheus);
196
197 let request = decode_remote_read_request(body).await?;
198
199 if let Some(schema) = extract_schema_from_read_request(&request) {
201 query_ctx.set_current_schema(&schema);
202 }
203
204 let query_ctx = Arc::new(query_ctx);
205 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
206 .with_label_values(&[db.as_str()])
207 .start_timer();
208
209 state.prom_store_handler.read(request, query_ctx).await
210}
211
212fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
213 Ok(Bytes::from(if is_zstd {
214 zstd_decompress(body)?
215 } else {
216 snappy_decompress(body)?
217 }))
218}
219
220pub fn decode_remote_write_request(
221 is_zstd: bool,
222 body: Bytes,
223 prom_validation_mode: PromValidationMode,
224 processor: &mut PromSeriesProcessor,
225) -> Result<TablesBuilder> {
226 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
227
228 let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
235 buf
236 } else {
237 try_decompress(!is_zstd, &body[..])?
239 };
240
241 let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
242
243 request
244 .merge(buf, prom_validation_mode, processor)
245 .context(error::DecodePromRemoteRequestSnafu)?;
246 Ok(std::mem::take(&mut request.table_data))
247}
248
249async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
250 let buf = snappy_decompress(&body[..])?;
251
252 ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
253}