servers/http/
prom_store.rs1use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::Extension;
19use axum::body::Bytes;
20use axum::extract::{Query, State};
21use axum::http::{HeaderValue, StatusCode, header};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use hyper::HeaderMap;
28use pipeline::PipelineDefinition;
29use pipeline::util::to_pipeline_version;
30use prost::Message;
31use serde::{Deserialize, Serialize};
32use session::context::{Channel, QueryContext};
33use snafu::prelude::*;
34
35use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
36use crate::http::extractor::PipelineInfo;
37use crate::http::header::{GREPTIME_DB_HEADER_METRICS, write_cost_header_map};
38use crate::pending_rows_batcher::PendingRowsBatcher;
39use crate::prom_remote_write::decode::PromSeriesProcessor;
40use crate::prom_remote_write::decode_remote_write_request;
41use crate::prom_remote_write::validation::PromValidationMode;
42use crate::prom_store::{extract_schema_from_read_request, snappy_decompress};
43use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
44
45pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
46pub const DEFAULT_ENCODING: &str = "snappy";
47pub const VM_ENCODING: &str = "zstd";
48pub const VM_PROTO_VERSION: &str = "1";
49
50#[derive(Clone)]
51pub struct PromStoreState {
52 pub prom_store_handler: PromStoreProtocolHandlerRef,
53 pub pipeline_handler: Option<PipelineHandlerRef>,
54 pub prom_store_with_metric_engine: bool,
55 pub prom_validation_mode: PromValidationMode,
56 pub pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct RemoteWriteQuery {
61 pub db: Option<String>,
62 pub physical_table: Option<String>,
65 pub get_vm_proto_version: Option<String>,
67}
68
69impl Default for RemoteWriteQuery {
70 fn default() -> RemoteWriteQuery {
71 Self {
72 db: Some(DEFAULT_SCHEMA_NAME.to_string()),
73 physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
74 get_vm_proto_version: None,
75 }
76 }
77}
78
79#[axum_macros::debug_handler]
80#[tracing::instrument(
81 skip_all,
82 fields(protocol = "prometheus", request_type = "remote_write")
83)]
84pub async fn remote_write(
85 State(state): State<PromStoreState>,
86 Query(params): Query<RemoteWriteQuery>,
87 Extension(mut query_ctx): Extension<QueryContext>,
88 pipeline_info: PipelineInfo,
89 content_encoding: TypedHeader<headers::ContentEncoding>,
90 body: Bytes,
91) -> Result<impl IntoResponse> {
92 let PromStoreState {
93 prom_store_handler,
94 pipeline_handler,
95 prom_store_with_metric_engine,
96 prom_validation_mode,
97 pending_rows_batcher,
98 } = state;
99
100 if let Some(_vm_handshake) = params.get_vm_proto_version {
101 return Ok(VM_PROTO_VERSION.into_response());
102 }
103
104 let db = params.db.clone().unwrap_or_default();
105 query_ctx.set_channel(Channel::Prometheus);
106 let physical_table = params
107 .physical_table
108 .clone()
109 .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
110 query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table.clone());
111 let query_ctx = Arc::new(query_ctx);
112 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
113 .with_label_values(&[db.as_str()])
114 .start_timer();
115
116 let is_zstd = content_encoding.contains(VM_ENCODING);
117
118 let mut processor = PromSeriesProcessor::default_processor();
119
120 if let Some(pipeline_name) = pipeline_info.pipeline_name {
121 let pipeline_def = PipelineDefinition::from_name(
122 &pipeline_name,
123 to_pipeline_version(pipeline_info.pipeline_version.as_deref())
124 .context(PipelineSnafu)?,
125 None,
126 )
127 .context(PipelineSnafu)?;
128 let pipeline_handler = pipeline_handler.context(InternalSnafu {
129 err_msg: "pipeline handler is not set".to_string(),
130 })?;
131
132 processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
133 }
134
135 let mut req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor)?;
136
137 let req = if processor.use_pipeline {
138 processor.exec_pipeline().await?
139 } else {
140 req.as_insert_requests()
141 };
142
143 if prom_store_with_metric_engine && let Some(batcher) = pending_rows_batcher {
144 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
145 prom_store_handler
146 .pre_write(&reqs, temp_ctx.clone())
147 .await?;
148 let rows = batcher.submit(reqs, temp_ctx).await?;
149 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
150 .with_label_values(&[db.as_str()])
151 .inc_by(rows);
152 }
153 return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
154 }
155
156 let mut cost = 0;
157 for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
158 let cnt: u64 = reqs
159 .inserts
160 .iter()
161 .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
162 .sum();
163 let output = prom_store_handler
164 .write(reqs, temp_ctx, prom_store_with_metric_engine)
165 .await?;
166 crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
167 .with_label_values(&[db.as_str()])
168 .inc_by(cnt);
169 cost += output.meta.cost;
170 }
171
172 Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
173}
174
175impl IntoResponse for PromStoreResponse {
176 fn into_response(self) -> axum::response::Response {
177 let mut header_map = HeaderMap::new();
178 header_map.insert(&header::CONTENT_TYPE, self.content_type);
179 header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
180
181 let metrics = if self.resp_metrics.is_empty() {
182 None
183 } else {
184 serde_json::to_string(&self.resp_metrics).ok()
185 };
186 if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
187 header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
188 }
189
190 (header_map, self.body).into_response()
191 }
192}
193
194#[axum_macros::debug_handler]
195#[tracing::instrument(
196 skip_all,
197 fields(protocol = "prometheus", request_type = "remote_read")
198)]
199pub async fn remote_read(
200 State(state): State<PromStoreState>,
201 Query(params): Query<RemoteWriteQuery>,
202 Extension(mut query_ctx): Extension<QueryContext>,
203 body: Bytes,
204) -> Result<PromStoreResponse> {
205 let db = params.db.clone().unwrap_or_default();
206 query_ctx.set_channel(Channel::Prometheus);
207
208 let request = decode_remote_read_request(body).await?;
209
210 if let Some(schema) = extract_schema_from_read_request(&request) {
212 query_ctx.set_current_schema(&schema);
213 }
214
215 let query_ctx = Arc::new(query_ctx);
216 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
217 .with_label_values(&[db.as_str()])
218 .start_timer();
219
220 state.prom_store_handler.read(request, query_ctx).await
221}
222
223async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
224 let buf = snappy_decompress(&body[..])?;
225
226 ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
227}