servers/prom_remote_write/
row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::prom_store::remote::Sample;
18use api::v1::helper::{field_column_schema, time_index_column_schema};
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
21use common_query::prelude::{greptime_timestamp, greptime_value};
22use pipeline::{ContextOpt, ContextReq};
23use prost::DecodeError;
24
25use crate::prom_remote_write::PromValidationMode;
26use crate::prom_remote_write::types::PromLabel;
27use crate::prom_remote_write::validation::validate_label_name;
28use crate::repeated_field::Clear;
29
30#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
31pub struct PromCtx {
32    pub schema: Option<String>,
33    pub physical_table: Option<String>,
34}
35
36#[derive(Default, Debug)]
37pub struct TablesBuilder<'a> {
38    pub tables: HashMap<PromCtx, HashMap<String, TableBuilder<'a>>>,
39    pub(crate) raw_data: Vec<u8>,
40}
41
42impl<'a> Clear for TablesBuilder<'a> {
43    fn clear(&mut self) {
44        self.tables.clear();
45        self.raw_data.clear();
46    }
47}
48
49impl<'a> TablesBuilder<'a> {
50    pub(crate) fn get_or_create_table_builder(
51        &mut self,
52        prom_ctx: PromCtx,
53        table_name: String,
54        label_num: usize,
55        row_num: usize,
56    ) -> &mut TableBuilder<'a> {
57        self.tables
58            .entry(prom_ctx)
59            .or_default()
60            .entry(table_name)
61            .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
62    }
63
64    pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
65        self.tables
66            .drain()
67            .map(|(prom, mut tables)| {
68                let mut opt = ContextOpt::default();
69                if let Some(physical_table) = prom.physical_table {
70                    opt.set_physical_table(physical_table);
71                }
72                if let Some(schema) = prom.schema {
73                    opt.set_schema(schema);
74                }
75
76                let mut ctx_req = ContextReq::default();
77                let reqs = tables
78                    .drain()
79                    .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
80                ctx_req.add_rows(opt, reqs);
81
82                ctx_req
83            })
84            .fold(ContextReq::default(), |mut req, reqs| {
85                req.merge(reqs);
86                req
87            })
88    }
89
90    pub(crate) fn set_raw_data(&mut self, buf: Vec<u8>) {
91        self.raw_data = buf;
92    }
93}
94
95#[derive(Debug)]
96pub struct TableBuilder<'a> {
97    schema: Vec<ColumnSchema>,
98    rows: Vec<Row>,
99    col_indexes: HashMap<&'a [u8], usize>,
100}
101
102impl<'a> Default for TableBuilder<'a> {
103    fn default() -> Self {
104        Self::with_capacity(2, 0)
105    }
106}
107
108impl<'a> TableBuilder<'a> {
109    pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
110        let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
111        col_indexes.insert(greptime_timestamp().as_bytes(), 0);
112        col_indexes.insert(greptime_value().as_bytes(), 1);
113
114        let mut schema = Vec::with_capacity(cols);
115        schema.push(time_index_column_schema(
116            greptime_timestamp(),
117            ColumnDataType::TimestampMillisecond,
118        ));
119        schema.push(field_column_schema(
120            greptime_value(),
121            ColumnDataType::Float64,
122        ));
123
124        Self {
125            schema,
126            rows: Vec::with_capacity(rows),
127            col_indexes,
128        }
129    }
130
131    pub(crate) fn add_labels_and_samples(
132        &mut self,
133        labels: &[PromLabel],
134        samples: &[Sample],
135        prom_validation_mode: PromValidationMode,
136    ) -> Result<(), DecodeError> {
137        let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
138
139        for PromLabel { name, value } in labels {
140            if !validate_label_name(name) {
141                return Err(DecodeError::new(format!(
142                    "Invalid label name: `{}`",
143                    String::from_utf8_lossy(name)
144                )));
145            }
146            let raw_tag_name = *name;
147            let tag_value = Some(ValueData::StringValue(
148                prom_validation_mode.decode_string(value)?,
149            ));
150            let tag_num = self.col_indexes.len();
151
152            if let Some(e) = self.col_indexes.get_mut(raw_tag_name) {
153                row[*e].value_data = tag_value;
154                continue;
155            }
156
157            let tag_name = unsafe { std::str::from_utf8_unchecked(raw_tag_name) };
158            self.schema.push(ColumnSchema {
159                column_name: tag_name.to_owned(),
160                datatype: ColumnDataType::String as i32,
161                semantic_type: SemanticType::Tag as i32,
162                ..Default::default()
163            });
164            self.col_indexes.insert(raw_tag_name, tag_num);
165
166            row.push(Value {
167                value_data: tag_value,
168            });
169        }
170
171        if samples.len() == 1 {
172            let sample = &samples[0];
173            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
174            row[1].value_data = Some(ValueData::F64Value(sample.value));
175            self.rows.push(Row { values: row });
176            return Ok(());
177        }
178        for sample in samples {
179            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
180            row[1].value_data = Some(ValueData::F64Value(sample.value));
181            self.rows.push(Row {
182                values: row.clone(),
183            });
184        }
185
186        Ok(())
187    }
188
189    pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
190        let mut rows = std::mem::take(&mut self.rows);
191        let schema = std::mem::take(&mut self.schema);
192        let col_num = schema.len();
193        for row in &mut rows {
194            if row.values.len() < col_num {
195                row.values.resize(col_num, Value { value_data: None });
196            }
197        }
198
199        RowInsertRequest {
200            table_name,
201            rows: Some(Rows { schema, rows }),
202        }
203    }
204
205    pub fn tags(&self) -> impl Iterator<Item = &String> {
206        self.schema
207            .iter()
208            .filter(|v| v.semantic_type == SemanticType::Tag as i32)
209            .map(|c| &c.column_name)
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use api::prom_store::remote::Sample;
216    use prost::DecodeError;
217
218    use super::*;
219
220    #[test]
221    fn test_table_builder() {
222        let mut builder = TableBuilder::default();
223        let _ = builder.add_labels_and_samples(
224            &[
225                PromLabel {
226                    name: b"tag0",
227                    value: b"v0",
228                },
229                PromLabel {
230                    name: b"tag1",
231                    value: b"v1",
232                },
233            ],
234            &[Sample {
235                value: 0.0,
236                timestamp: 0,
237            }],
238            PromValidationMode::Strict,
239        );
240
241        let _ = builder.add_labels_and_samples(
242            &[
243                PromLabel {
244                    name: b"tag0",
245                    value: b"v0",
246                },
247                PromLabel {
248                    name: b"tag2",
249                    value: b"v2",
250                },
251            ],
252            &[Sample {
253                value: 0.1,
254                timestamp: 1,
255            }],
256            PromValidationMode::Strict,
257        );
258
259        let request = builder.as_row_insert_request("test".to_string());
260        let rows = request.rows.unwrap().rows;
261        assert_eq!(2, rows.len());
262
263        let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
264        let res = builder.add_labels_and_samples(
265            &[PromLabel {
266                name: b"tag0",
267                value: invalid_utf8_bytes,
268            }],
269            &[Sample {
270                value: 0.1,
271                timestamp: 1,
272            }],
273            PromValidationMode::Strict,
274        );
275        assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
276    }
277}