servers/prom_remote_write/
mod.rs1pub mod decode;
21pub(crate) mod row_builder;
22pub(crate) mod types;
23pub mod validation;
24
25use bytes::Bytes;
26use lazy_static::lazy_static;
27use object_pool::Pool;
28use snafu::ResultExt;
29
30use crate::error;
31use crate::prom_remote_write::decode::{PromSeriesProcessor, PromWriteRequest};
32use crate::prom_remote_write::row_builder::TablesBuilder;
33use crate::prom_remote_write::validation::PromValidationMode;
34use crate::prom_store::{snappy_decompress, zstd_decompress};
35
36lazy_static! {
37 static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest<'static>> =
38 Pool::new(256, PromWriteRequest::default);
39}
40
41pub fn try_decompress(is_zstd: bool, body: &[u8]) -> crate::error::Result<Vec<u8>> {
42 if is_zstd {
43 zstd_decompress(body)
44 } else {
45 snappy_decompress(body)
46 }
47}
48
49pub fn decode_remote_write_request(
50 is_zstd: bool,
51 body: Bytes,
52 prom_validation_mode: PromValidationMode,
53 processor: &mut PromSeriesProcessor,
54) -> crate::error::Result<TablesBuilder<'static>> {
55 let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
56
57 let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
64 buf
65 } else {
66 try_decompress(!is_zstd, &body[..])?
68 };
69
70 let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
71
72 request
73 .decode(buf, prom_validation_mode, processor)
74 .context(error::DecodePromRemoteRequestSnafu)?;
75 Ok(std::mem::take(&mut request.table_data))
76}