servers/prom_remote_write/
mod.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prometheus remote write support.
16//!
17//! This module groups validation, row building, and protobuf decoding for
18//! the Prometheus remote write API.
19
20pub mod decode;
21pub(crate) mod row_builder;
22pub(crate) mod types;
23pub mod validation;
24
25use bytes::Bytes;
26use lazy_static::lazy_static;
27use object_pool::Pool;
28use snafu::ResultExt;
29
30use crate::error;
31use crate::prom_remote_write::decode::{PromSeriesProcessor, PromWriteRequest};
32use crate::prom_remote_write::row_builder::TablesBuilder;
33use crate::prom_remote_write::validation::PromValidationMode;
34use crate::prom_store::{snappy_decompress, zstd_decompress};
35
36lazy_static! {
37    static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest<'static>> =
38        Pool::new(256, PromWriteRequest::default);
39}
40
41pub fn try_decompress(is_zstd: bool, body: &[u8]) -> crate::error::Result<Vec<u8>> {
42    if is_zstd {
43        zstd_decompress(body)
44    } else {
45        snappy_decompress(body)
46    }
47}
48
49pub fn decode_remote_write_request(
50    is_zstd: bool,
51    body: Bytes,
52    prom_validation_mode: PromValidationMode,
53    processor: &mut PromSeriesProcessor,
54) -> crate::error::Result<TablesBuilder<'static>> {
55    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
56
57    // due to vmagent's limitation, there is a chance that vmagent is
58    // sending content type wrong so we have to apply a fallback with decoding
59    // the content in another method.
60    //
61    // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
62    // see https://github.com/GreptimeTeam/greptimedb/issues/3929
63    let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
64        buf
65    } else {
66        // fallback to the other compression method
67        try_decompress(!is_zstd, &body[..])?
68    };
69
70    let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
71
72    request
73        .decode(buf, prom_validation_mode, processor)
74        .context(error::DecodePromRemoteRequestSnafu)?;
75    Ok(std::mem::take(&mut request.table_data))
76}