servers/prom_remote_write/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared types for Prometheus remote write decoding.
16
17use std::slice;
18
19use api::prom_store::remote::Sample;
20use bytes::Buf;
21use prost::DecodeError;
22use prost::encoding::{WireType, decode_varint};
23
24use crate::repeated_field::Clear;
25
26pub type RawBytes = &'static [u8];
27
28impl Clear for Sample {
29    fn clear(&mut self) {
30        self.timestamp = 0;
31        self.value = 0.0;
32    }
33}
34
35#[derive(Default, Clone, Debug)]
36pub(crate) struct PromLabel {
37    pub name: RawBytes,
38    pub value: RawBytes,
39}
40
41impl Clear for PromLabel {
42    fn clear(&mut self) {
43        self.name.clear();
44        self.value.clear();
45    }
46}
47
48impl PromLabel {
49    pub(crate) fn merge_field(
50        &mut self,
51        tag: u32,
52        wire_type: WireType,
53        buf: &mut &[u8],
54    ) -> Result<(), DecodeError> {
55        const STRUCT_NAME: &str = "PromLabel";
56        match tag {
57            1u32 => {
58                let value = &mut self.name;
59                merge_bytes(value, buf).map_err(|mut error| {
60                    error.push(STRUCT_NAME, "name");
61                    error
62                })
63            }
64            2u32 => {
65                let value = &mut self.value;
66                merge_bytes(value, buf).map_err(|mut error| {
67                    error.push(STRUCT_NAME, "value");
68                    error
69                })
70            }
71            _ => prost::encoding::skip_field(wire_type, tag, buf, Default::default()),
72        }
73    }
74}
75
76/// Reads a variable-length encoded bytes field from `src` and assign it to `dst`.
77#[inline(always)]
78fn merge_bytes(dst: &mut RawBytes, src: &mut &[u8]) -> Result<(), DecodeError> {
79    let len = decode_varint(src)? as usize;
80    if len > src.remaining() {
81        return Err(DecodeError::new(format!(
82            "buffer underflow, len: {}, remaining: {}",
83            len,
84            src.remaining()
85        )));
86    }
87    *dst = unsafe { slice::from_raw_parts(src.as_ptr(), len) };
88    src.advance(len);
89    Ok(())
90}