servers/prom_remote_write/
mod.rs1pub mod decode;
21pub(crate) mod row_builder;
22pub(crate) mod types;
23#[cfg(any(test, feature = "testing"))]
24pub mod v2;
25#[cfg(not(any(test, feature = "testing")))]
26pub(crate) mod v2;
27pub mod validation;
28
29use bytes::Bytes;
30use lazy_static::lazy_static;
31use object_pool::Pool;
32use snafu::ResultExt;
33
34use crate::error;
35use crate::prom_remote_write::decode::{PromSeriesProcessor, PromWriteRequest};
36use crate::prom_remote_write::row_builder::TablesBuilder;
37use crate::prom_remote_write::validation::PromValidationMode;
38use crate::prom_store::{snappy_decompress, zstd_decompress};
39
40lazy_static! {
41 static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest<'static>> =
42 Pool::new(256, PromWriteRequest::default);
43}
44
45pub fn try_decompress(is_zstd: bool, body: &[u8]) -> crate::error::Result<Vec<u8>> {
46 if is_zstd {
47 zstd_decompress(body)
48 } else {
49 snappy_decompress(body)
50 }
51}
52
53pub fn decode_remote_write_request(
54 is_zstd: bool,
55 body: Bytes,
56 prom_validation_mode: PromValidationMode,
57 processor: &mut PromSeriesProcessor,
58) -> crate::error::Result<TablesBuilder<'static>> {
59 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
60
61 let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
68 buf
69 } else {
70 try_decompress(!is_zstd, &body[..])?
72 };
73
74 let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
75
76 request
77 .decode(buf, prom_validation_mode, processor)
78 .context(error::DecodePromRemoteRequestSnafu)?;
79 Ok(std::mem::take(&mut request.table_data))
80}