servers/http/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use api::v1::RowInsertRequests;
19use axum::body::Bytes;
20use axum::extract::{Query, State};
21use axum::http::{header, HeaderValue, StatusCode};
22use axum::response::IntoResponse;
23use axum::Extension;
24use axum_extra::TypedHeader;
25use common_catalog::consts::DEFAULT_SCHEMA_NAME;
26use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
27use common_telemetry::tracing;
28use hyper::HeaderMap;
29use lazy_static::lazy_static;
30use object_pool::Pool;
31use prost::Message;
32use serde::{Deserialize, Serialize};
33use session::context::{Channel, QueryContext};
34use snafu::prelude::*;
35
36use crate::error::{self, Result};
37use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
38use crate::prom_store::{snappy_decompress, zstd_decompress};
39use crate::proto::PromWriteRequest;
40use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
41
42pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
43lazy_static! {
44    static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
45        Pool::new(256, PromWriteRequest::default);
46}
47
48pub const DEFAULT_ENCODING: &str = "snappy";
49pub const VM_ENCODING: &str = "zstd";
50pub const VM_PROTO_VERSION: &str = "1";
51
52#[derive(Clone)]
53pub struct PromStoreState {
54    pub prom_store_handler: PromStoreProtocolHandlerRef,
55    pub prom_store_with_metric_engine: bool,
56    pub is_strict_mode: bool,
57}
58
59#[derive(Debug, Serialize, Deserialize)]
60pub struct RemoteWriteQuery {
61    pub db: Option<String>,
62    /// Specify which physical table to use for storing metrics.
63    /// This only works on remote write requests.
64    pub physical_table: Option<String>,
65    /// For VictoriaMetrics modified remote write protocol
66    pub get_vm_proto_version: Option<String>,
67}
68
69impl Default for RemoteWriteQuery {
70    fn default() -> RemoteWriteQuery {
71        Self {
72            db: Some(DEFAULT_SCHEMA_NAME.to_string()),
73            physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
74            get_vm_proto_version: None,
75        }
76    }
77}
78
79#[axum_macros::debug_handler]
80#[tracing::instrument(
81    skip_all,
82    fields(protocol = "prometheus", request_type = "remote_write")
83)]
84pub async fn remote_write(
85    State(state): State<PromStoreState>,
86    Query(params): Query<RemoteWriteQuery>,
87    Extension(mut query_ctx): Extension<QueryContext>,
88    content_encoding: TypedHeader<headers::ContentEncoding>,
89    body: Bytes,
90) -> Result<impl IntoResponse> {
91    let PromStoreState {
92        prom_store_handler,
93        prom_store_with_metric_engine,
94        is_strict_mode,
95    } = state;
96
97    if let Some(_vm_handshake) = params.get_vm_proto_version {
98        return Ok(VM_PROTO_VERSION.into_response());
99    }
100
101    let db = params.db.clone().unwrap_or_default();
102    query_ctx.set_channel(Channel::Prometheus);
103    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
104        .with_label_values(&[db.as_str()])
105        .start_timer();
106
107    let is_zstd = content_encoding.contains(VM_ENCODING);
108    let (request, samples) = decode_remote_write_request(is_zstd, body, is_strict_mode).await?;
109
110    if let Some(physical_table) = params.physical_table {
111        query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
112    }
113    let query_ctx = Arc::new(query_ctx);
114
115    let output = prom_store_handler
116        .write(request, query_ctx, prom_store_with_metric_engine)
117        .await?;
118    crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
119    Ok((
120        StatusCode::NO_CONTENT,
121        write_cost_header_map(output.meta.cost),
122    )
123        .into_response())
124}
125
126impl IntoResponse for PromStoreResponse {
127    fn into_response(self) -> axum::response::Response {
128        let mut header_map = HeaderMap::new();
129        header_map.insert(&header::CONTENT_TYPE, self.content_type);
130        header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
131
132        let metrics = if self.resp_metrics.is_empty() {
133            None
134        } else {
135            serde_json::to_string(&self.resp_metrics).ok()
136        };
137        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
138            header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
139        }
140
141        (header_map, self.body).into_response()
142    }
143}
144
145#[axum_macros::debug_handler]
146#[tracing::instrument(
147    skip_all,
148    fields(protocol = "prometheus", request_type = "remote_read")
149)]
150pub async fn remote_read(
151    State(state): State<PromStoreState>,
152    Query(params): Query<RemoteWriteQuery>,
153    Extension(mut query_ctx): Extension<QueryContext>,
154    body: Bytes,
155) -> Result<PromStoreResponse> {
156    let db = params.db.clone().unwrap_or_default();
157    query_ctx.set_channel(Channel::Prometheus);
158    let query_ctx = Arc::new(query_ctx);
159    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
160        .with_label_values(&[db.as_str()])
161        .start_timer();
162
163    let request = decode_remote_read_request(body).await?;
164
165    state.prom_store_handler.read(request, query_ctx).await
166}
167
168fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
169    Ok(Bytes::from(if is_zstd {
170        zstd_decompress(body)?
171    } else {
172        snappy_decompress(body)?
173    }))
174}
175
176async fn decode_remote_write_request(
177    is_zstd: bool,
178    body: Bytes,
179    is_strict_mode: bool,
180) -> Result<(RowInsertRequests, usize)> {
181    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
182
183    // due to vmagent's limitation, there is a chance that vmagent is
184    // sending content type wrong so we have to apply a fallback with decoding
185    // the content in another method.
186    //
187    // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
188    // see https://github.com/GreptimeTeam/greptimedb/issues/3929
189    let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
190        buf
191    } else {
192        // fallback to the other compression method
193        try_decompress(!is_zstd, &body[..])?
194    };
195
196    let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
197    request
198        .merge(buf, is_strict_mode)
199        .context(error::DecodePromRemoteRequestSnafu)?;
200    Ok(request.as_row_insert_requests())
201}
202
203async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
204    let buf = snappy_decompress(&body[..])?;
205
206    ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
207}