servers/http/
prom_store.rs1use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::body::Bytes;
19use axum::extract::{Query, State};
20use axum::http::{header, HeaderValue, StatusCode};
21use axum::response::IntoResponse;
22use axum::Extension;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use hyper::HeaderMap;
28use lazy_static::lazy_static;
29use object_pool::Pool;
30use pipeline::util::to_pipeline_version;
31use pipeline::{ContextReq, PipelineDefinition};
32use prost::Message;
33use serde::{Deserialize, Serialize};
34use session::context::{Channel, QueryContext};
35use snafu::prelude::*;
36
37use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
38use crate::http::extractor::PipelineInfo;
39use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
40use crate::http::PromValidationMode;
41use crate::prom_store::{snappy_decompress, zstd_decompress};
42use crate::proto::{PromSeriesProcessor, PromWriteRequest};
43use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
44
45pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
46lazy_static! {
47 static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
48 Pool::new(256, PromWriteRequest::default);
49}
50
51pub const DEFAULT_ENCODING: &str = "snappy";
52pub const VM_ENCODING: &str = "zstd";
53pub const VM_PROTO_VERSION: &str = "1";
54
55#[derive(Clone)]
56pub struct PromStoreState {
57 pub prom_store_handler: PromStoreProtocolHandlerRef,
58 pub pipeline_handler: Option<PipelineHandlerRef>,
59 pub prom_store_with_metric_engine: bool,
60 pub prom_validation_mode: PromValidationMode,
61}
62
63#[derive(Debug, Serialize, Deserialize)]
64pub struct RemoteWriteQuery {
65 pub db: Option<String>,
66 pub physical_table: Option<String>,
69 pub get_vm_proto_version: Option<String>,
71}
72
73impl Default for RemoteWriteQuery {
74 fn default() -> RemoteWriteQuery {
75 Self {
76 db: Some(DEFAULT_SCHEMA_NAME.to_string()),
77 physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
78 get_vm_proto_version: None,
79 }
80 }
81}
82
83#[axum_macros::debug_handler]
84#[tracing::instrument(
85 skip_all,
86 fields(protocol = "prometheus", request_type = "remote_write")
87)]
88pub async fn remote_write(
89 State(state): State<PromStoreState>,
90 Query(params): Query<RemoteWriteQuery>,
91 Extension(mut query_ctx): Extension<QueryContext>,
92 pipeline_info: PipelineInfo,
93 content_encoding: TypedHeader<headers::ContentEncoding>,
94 body: Bytes,
95) -> Result<impl IntoResponse> {
96 let PromStoreState {
97 prom_store_handler,
98 pipeline_handler,
99 prom_store_with_metric_engine,
100 prom_validation_mode,
101 } = state;
102
103 if let Some(_vm_handshake) = params.get_vm_proto_version {
104 return Ok(VM_PROTO_VERSION.into_response());
105 }
106
107 let db = params.db.clone().unwrap_or_default();
108 query_ctx.set_channel(Channel::Prometheus);
109 if let Some(physical_table) = params.physical_table {
110 query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
111 }
112 let query_ctx = Arc::new(query_ctx);
113 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
114 .with_label_values(&[db.as_str()])
115 .start_timer();
116
117 let is_zstd = content_encoding.contains(VM_ENCODING);
118
119 let mut processor = PromSeriesProcessor::default_processor();
120 if let Some(pipeline_name) = pipeline_info.pipeline_name {
121 let pipeline_def = PipelineDefinition::from_name(
122 &pipeline_name,
123 to_pipeline_version(pipeline_info.pipeline_version.as_deref())
124 .context(PipelineSnafu)?,
125 None,
126 )
127 .context(PipelineSnafu)?;
128 let pipeline_handler = pipeline_handler.context(InternalSnafu {
129 err_msg: "pipeline handler is not set".to_string(),
130 })?;
131
132 processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
133 }
134
135 let req =
136 decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
137
138 let mut cost = 0;
139 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
140 let cnt: u64 = reqs
141 .inserts
142 .iter()
143 .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
144 .sum();
145 let output = prom_store_handler
146 .write(reqs, temp_ctx, prom_store_with_metric_engine)
147 .await?;
148 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(cnt);
149 cost += output.meta.cost;
150 }
151
152 Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
153}
154
155impl IntoResponse for PromStoreResponse {
156 fn into_response(self) -> axum::response::Response {
157 let mut header_map = HeaderMap::new();
158 header_map.insert(&header::CONTENT_TYPE, self.content_type);
159 header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
160
161 let metrics = if self.resp_metrics.is_empty() {
162 None
163 } else {
164 serde_json::to_string(&self.resp_metrics).ok()
165 };
166 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
167 header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
168 }
169
170 (header_map, self.body).into_response()
171 }
172}
173
174#[axum_macros::debug_handler]
175#[tracing::instrument(
176 skip_all,
177 fields(protocol = "prometheus", request_type = "remote_read")
178)]
179pub async fn remote_read(
180 State(state): State<PromStoreState>,
181 Query(params): Query<RemoteWriteQuery>,
182 Extension(mut query_ctx): Extension<QueryContext>,
183 body: Bytes,
184) -> Result<PromStoreResponse> {
185 let db = params.db.clone().unwrap_or_default();
186 query_ctx.set_channel(Channel::Prometheus);
187 let query_ctx = Arc::new(query_ctx);
188 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
189 .with_label_values(&[db.as_str()])
190 .start_timer();
191
192 let request = decode_remote_read_request(body).await?;
193
194 state.prom_store_handler.read(request, query_ctx).await
195}
196
197fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
198 Ok(Bytes::from(if is_zstd {
199 zstd_decompress(body)?
200 } else {
201 snappy_decompress(body)?
202 }))
203}
204
205async fn decode_remote_write_request(
206 is_zstd: bool,
207 body: Bytes,
208 prom_validation_mode: PromValidationMode,
209 processor: &mut PromSeriesProcessor,
210) -> Result<ContextReq> {
211 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
212
213 let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
220 buf
221 } else {
222 try_decompress(!is_zstd, &body[..])?
224 };
225
226 let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
227
228 request
229 .merge(buf, prom_validation_mode, processor)
230 .context(error::DecodePromRemoteRequestSnafu)?;
231
232 if processor.use_pipeline {
233 processor.exec_pipeline().await
234 } else {
235 let reqs = request.as_row_insert_requests();
236 Ok(ContextReq::default_opt_with_reqs(reqs))
237 }
238}
239
240async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
241 let buf = snappy_decompress(&body[..])?;
242
243 ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
244}