Skip to main content

servers/prom_remote_write/
mod.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Prometheus remote write support.
16//!
17//! This module groups validation, row building, and protobuf decoding for
18//! the Prometheus remote write API.
19
20pub mod decode;
21pub(crate) mod row_builder;
22pub(crate) mod types;
23#[cfg(any(test, feature = "testing"))]
24pub mod v2;
25#[cfg(not(any(test, feature = "testing")))]
26pub(crate) mod v2;
27pub mod validation;
28
29use bytes::Bytes;
30use lazy_static::lazy_static;
31use object_pool::Pool;
32use snafu::ResultExt;
33
34use crate::error;
35use crate::prom_remote_write::decode::{PromSeriesProcessor, PromWriteRequest};
36use crate::prom_remote_write::row_builder::TablesBuilder;
37use crate::prom_remote_write::validation::PromValidationMode;
38use crate::prom_store::{snappy_decompress, zstd_decompress};
39
40lazy_static! {
41    static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest<'static>> =
42        Pool::new(256, PromWriteRequest::default);
43}
44
45pub fn try_decompress(is_zstd: bool, body: &[u8]) -> crate::error::Result<Vec<u8>> {
46    if is_zstd {
47        zstd_decompress(body)
48    } else {
49        snappy_decompress(body)
50    }
51}
52
53pub fn decode_remote_write_request(
54    is_zstd: bool,
55    body: Bytes,
56    prom_validation_mode: PromValidationMode,
57    processor: &mut PromSeriesProcessor,
58) -> crate::error::Result<TablesBuilder<'static>> {
59    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
60
61    // due to vmagent's limitation, there is a chance that vmagent is
62    // sending content type wrong so we have to apply a fallback with decoding
63    // the content in another method.
64    //
65    // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
66    // see https://github.com/GreptimeTeam/greptimedb/issues/3929
67    let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
68        buf
69    } else {
70        // fallback to the other compression method
71        try_decompress(!is_zstd, &body[..])?
72    };
73
74    let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
75
76    request
77        .decode(buf, prom_validation_mode, processor)
78        .context(error::DecodePromRemoteRequestSnafu)?;
79    Ok(std::mem::take(&mut request.table_data))
80}